* Log when DispatchQueue throttle limit is reached.
* Display a HEALTH_WARN in the ceph status.
Fixes: https://tracker.ceph.com/issues/46226
Signed-off-by: Jos Collin <jcollin@redhat.com>
std::lock_guard l(lock);
if (_should_wait(c) || !conds.empty()) {
ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
+ failed++;
result = false;
} else {
ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load()
if (!conds.empty())
conds.front().notify_one();
count = 0;
+ failed = 0;
if (logger) {
logger->set(l_throttle_val, 0);
}
CephContext *cct;
const std::string name;
PerfCountersRef logger;
- std::atomic<int64_t> count = { 0 }, max = { 0 };
+ std::atomic<int64_t> count = { 0 }, max = { 0 }, failed = { 0 };
std::mutex lock;
std::list<std::condition_variable> conds;
const bool use_perf;
* @returns number of requests being hold after this
*/
int64_t put(int64_t c = 1) override;
+
+ /**
+ * gets the number of (current) slot request failures.
+ * @returns the number of slot request failures
+ */
+ int64_t get_failed() const {
+ return failed.load();
+ }
+
/**
* reset the zero to the stock
*/
fmt_desc: Throttles total size of messages waiting to be dispatched.
default: 100_M
with_legacy: true
+- name: ms_dispatch_throttle_log_interval
+ type: secs
+ level: advanced
+ desc: Interval in seconds to show the cluster warning and health warning (in ceph status)
+ when the dispatch throttle limit is exceeded. Setting it to 0 disables the
+ cluster warning and health warning.
+ default: 30
+ min: 0
- name: ms_bind_ipv4
type: bool
level: advanced
enum class daemon_metric : uint8_t {
SLOW_OPS,
PENDING_CREATING_PGS,
+ DISPATCH_QUEUE_THROTTLE,
NONE,
};
switch (t) {
case daemon_metric::SLOW_OPS: return "SLOW_OPS";
case daemon_metric::PENDING_CREATING_PGS: return "PENDING_CREATING_PGS";
+ case daemon_metric::DISPATCH_QUEUE_THROTTLE: return "DISPATCH_QUEUE_THROTTLE";
case daemon_metric::NONE: return "NONE";
default: return "???";
}
vector<DaemonKey> osds;
};
+class DispatchQueueThrottle final : public DaemonHealthMetricCollector {
+ bool _is_relevant(daemon_metric type) const override {
+ return type == daemon_metric::DISPATCH_QUEUE_THROTTLE;
+ }
+ health_check_t& _get_check(health_check_map_t& cm) const override {
+ return cm.get_or_add("DISPATCH_QUEUE_THROTTLE", HEALTH_WARN, "", 1);
+ }
+ bool _update(const DaemonKey& daemon, const DaemonHealthMetric& metric) override {
+ value.n = metric.get_n();
+ if (metric.get_n()) {
+ daemons.push_back(daemon);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ void _summarize(health_check_t& check) const override {
+ if (daemons.empty()) {
+ return;
+ }
+ check.summary = fmt::format("Dispatch Queue Throttling, {} messages throttled.", value.n);
+ }
+ vector<DaemonKey> daemons;
+};
+
} // anonymous namespace
unique_ptr<DaemonHealthMetricCollector>
return std::make_unique<SlowOps>();
case daemon_metric::PENDING_CREATING_PGS:
return std::make_unique<PendingPGs>();
+ case daemon_metric::DISPATCH_QUEUE_THROTTLE:
+ return std::make_unique<DispatchQueueThrottle>();
default:
return {};
}
cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
last_mon_command_tid(0),
version_req_id(0)
-{}
+{
+ cct->_conf.add_observer(this);
+}
MonClient::~MonClient()
{
+ cct->_conf.remove_observer(this);
}
int MonClient::build_initial_monmap()
}
}
+bool MonClient::ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) {
+ switch (ttype) {
+ case ms_throttle_t::MESSAGE:
+ break; // TODO
+ case ms_throttle_t::BYTES:
+ break; // TODO
+ case ms_throttle_t::DISPATCH_QUEUE:
+ {
+ //cluster log a warning that Dispatch Queue Throttle Limit hit
+ if (!log_client) {
+ return false; //cannot handle if the daemon didn't setup a log_client for me
+ }
+ LogChannelRef clog = log_client->create_channel(CLOG_CHANNEL_CLUSTER);
+ clog->warn() << "Throttler Limit has been hit. "
+ << "Some message processing may be significantly delayed. "
+ << "Additional info: " << tinfo.takenslots << "/"
+ << tinfo.maxslots << " bytes used, "
+ << tinfo.failedrequests << " messages throttled.";
+ }
+ break;
+ case ms_throttle_t::NONE:
+ break;
+ default:
+ return false;
+ }
+ return true;
+}
+
bool MonClient::_opened() const
{
ceph_assert(ceph_mutex_is_locked(monc_lock));
return -EACCES;
}
+std::vector<std::string> MonClient::get_tracked_keys() const noexcept {
+ return {
+ "ms_dispatch_throttle_bytes"s,
+ "ms_dispatch_throttle_log_interval"s
+ };
+}
+
+void MonClient::handle_conf_change(const ConfigProxy& conf,
+ const std::set <std::string> &changed) {
+ ldout(cct, 10) << __func__ << " " << changed << dendl;
+ if (changed.count("ms_dispatch_throttle_bytes")) {
+ if (messenger) {
+ messenger->dispatch_throttle_bytes =
+ cct->_conf.get_val<Option::size_t>("ms_dispatch_throttle_bytes");
+ }
+ }
+ if (changed.count("ms_dispatch_throttle_log_interval")) {
+ if (messenger) {
+ messenger->dispatch_throttle_log_interval =
+ cct->_conf.get_val<std::chrono::seconds>("ms_dispatch_throttle_log_interval");
+ }
+ }
+}
+
AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
std::lock_guard l(monc_lock);
if (auth) {
const boost::system::error_category& monc_category() noexcept;
class MonClient : public Dispatcher,
- public AuthClient,
- public AuthServer, /* for mgr, osd, mds */
- public AdminSocketHook {
+ public AuthClient,
+ public AuthServer, /* for mgr, osd, mds */
+ public AdminSocketHook,
+ public md_config_obs_t {
static constexpr auto dout_subsys = ceph_subsys_monc;
public:
// Error, Newest, Oldest
bool ms_handle_reset(Connection *con) override;
void ms_handle_remote_reset(Connection *con) override {}
bool ms_handle_refused(Connection *con) override { return false; }
+ bool ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) override;
void handle_monmap(MMonMap *m);
void handle_config(MConfig *m);
uint32_t auth_method,
const ceph::buffer::list& bl,
ceph::buffer::list *reply) override;
-
+ // md_config_obs_t (config observer)
+ std::vector<std::string> get_tracked_keys() const noexcept override;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set <std::string> &changed) override;
void set_entity_name(EntityName name) { entity_name = name; }
void set_handle_authentication_dispatcher(Dispatcher *d) {
handle_authentication_dispatcher = d;
uint64_t get_id() {
return next_id++;
}
+
+ Messenger* get_messenger() const {
+ return msgr;
+ }
+
void start();
void entry();
void wait();
#include "include/ceph_assert.h"
#include "include/common_fwd.h"
#include "msg/MessageRef.h"
+#include "msg/msg_types.h"
#include <variant>
class Dispatcher {
public:
+ typedef struct {
+ uint64_t takenslots;
+ uint64_t maxslots;
+ uint64_t failedrequests;
+ } ThrottleInfo;
+
/* Ordering of dispatch for a list of Dispatchers. */
using priority_t = uint32_t;
static constexpr priority_t PRIORITY_HIGH = std::numeric_limits<priority_t>::max() / 4;
return false;
}
+ /**
+ * handle throttle limit hit and cluster log it.
+ *
+ * return true if handled
+ * return false if not handled
+ */
+ virtual bool ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) {
+ return false;
+ }
+
/**
* @} //Authentication
*/
{
auth_registry.refresh_config();
comp_registry.refresh_config();
+ dispatch_throttle_bytes.store(cct->_conf.get_val<Option::size_t>("ms_dispatch_throttle_bytes"));
+ dispatch_throttle_log_interval.store(cct->_conf.get_val<std::chrono::seconds>("ms_dispatch_throttle_log_interval"));
}
void Messenger::set_endpoint_addr(const entity_addr_t& a,
public:
AuthClient *auth_client = 0;
AuthServer *auth_server = 0;
+ std::atomic<uint64_t> dispatch_throttle_bytes;
+ std::atomic<std::chrono::seconds> dispatch_throttle_log_interval;
#ifdef UNIT_TESTS_BUILT
Interceptor *interceptor = nullptr;
void set_require_authorizer(bool b) {
require_authorizer = b;
}
+ /**
+ * Notify each Dispatcher that the Throttle Limit has been hit. Call
+ * this function whenever the connections are getting throttled.
+ *
+ * @param ttype Throttle type
+ * @param tinfo Throttle info
+ */
+ void ms_deliver_throttle(ms_throttle_t ttype, const Dispatcher::ThrottleInfo& tinfo) {
+ for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
+ dispatcher->ms_handle_throttle(ttype, tinfo);
+ }
+ }
/**
* @} // Dispatcher Interfacing
AsyncConnection *connection;
AsyncMessenger *messenger;
CephContext *cct;
+ ceph::coarse_mono_time throttle_prev_log {ceph::coarse_mono_clock::zero()};
+ const std::chrono::seconds THROTTLE_DELIVER_INTERVAL {std::chrono::seconds(120)};
public:
std::shared_ptr<AuthConnectionMeta> auth_meta;
ldout(cct, 20) << __func__ << dendl;
if (cur_msg_size) {
+ Messenger* msgr = connection->dispatch_queue->get_messenger();
+ //update max if it's changed in the conf. Expecting qa tests would change ms_dispatch_throttle_bytes.
+ connection->dispatch_queue->dispatch_throttler.reset_max(msgr->dispatch_throttle_bytes.load());
+
if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
cur_msg_size)) {
ldout(cct, 1)
<< connection->dispatch_queue->dispatch_throttler.get_current() << "/"
<< connection->dispatch_queue->dispatch_throttler.get_max()
<< " failed, just wait." << dendl;
+ ceph::coarse_mono_time throttle_now = ceph::coarse_mono_clock::now();
+ std::chrono::seconds configured_interval = msgr->dispatch_throttle_log_interval.load();
+ if (configured_interval.count()) {
+ if (std::chrono::duration_cast<std::chrono::seconds>(throttle_now - throttle_prev_log) >=
+ configured_interval) {
+ //Cluster logging that throttling is occurring.
+ Dispatcher::ThrottleInfo tinfo;
+ tinfo.takenslots = connection->dispatch_queue->dispatch_throttler.get_current();
+ tinfo.maxslots = connection->dispatch_queue->dispatch_throttler.get_max();
+ tinfo.failedrequests = connection->dispatch_queue->dispatch_throttler.get_failed();
+ msgr->ms_deliver_throttle(ms_throttle_t::DISPATCH_QUEUE, tinfo);
+ throttle_prev_log = throttle_now;
+ }
+ }
// following thread pool deal with th full message queue isn't a
// short time, so we can wait a ms.
if (connection->register_time_events.empty()) {
}
return nullptr;
}
+ else {
+ //Don't deliver ms_throttle_t::NONE forever. Limit it for THROTTLE_DELIVER_INTERVAL seconds
+ //since the last ms_throttle_t::DISPATCH_QUEUE delivery.
+ if (std::chrono::duration_cast<std::chrono::seconds>
+ (ceph::coarse_mono_clock::now() - throttle_prev_log) <= THROTTLE_DELIVER_INTERVAL) {
+ Dispatcher::ThrottleInfo tinfo = {0, 0, 0};
+ msgr->ms_deliver_throttle(ms_throttle_t::NONE, tinfo);
+ }
+ }
}
throttle_stamp = ceph_clock_now();
const size_t cur_msg_size = get_current_msg_size();
if (cur_msg_size) {
+ Messenger* msgr = connection->dispatch_queue->get_messenger();
+ //update max if it's changed in the conf. Expecting qa tests would change ms_dispatch_throttle_bytes.
+ connection->dispatch_queue->dispatch_throttler.reset_max(msgr->dispatch_throttle_bytes.load());
+
if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
cur_msg_size)) {
ldout(cct, 1)
<< connection->dispatch_queue->dispatch_throttler.get_current() << "/"
<< connection->dispatch_queue->dispatch_throttler.get_max()
<< " failed, just wait." << dendl;
+ ceph::coarse_mono_time throttle_now = ceph::coarse_mono_clock::now();
+ std::chrono::seconds configured_interval = msgr->dispatch_throttle_log_interval.load();
+ if (configured_interval.count()) {
+ if (std::chrono::duration_cast<std::chrono::seconds>(throttle_now - throttle_prev_log) >=
+ configured_interval) {
+ //Cluster logging that throttling is occurring.
+ Dispatcher::ThrottleInfo tinfo;
+ tinfo.takenslots = connection->dispatch_queue->dispatch_throttler.get_current();
+ tinfo.maxslots = connection->dispatch_queue->dispatch_throttler.get_max();
+ tinfo.failedrequests = connection->dispatch_queue->dispatch_throttler.get_failed();
+ msgr->ms_deliver_throttle(ms_throttle_t::DISPATCH_QUEUE, tinfo);
+ throttle_prev_log = throttle_now;
+ }
+ }
// following thread pool deal with th full message queue isn't a
// short time, so we can wait a ms.
if (connection->register_time_events.empty()) {
}
return nullptr;
}
+ else {
+ //Don't deliver ms_throttle_t::NONE forever. Limit it for THROTTLE_DELIVER_INTERVAL seconds
+ //since the last ms_throttle_t::DISPATCH_QUEUE delivery.
+ if (std::chrono::duration_cast<std::chrono::seconds>
+ (ceph::coarse_mono_clock::now() - throttle_prev_log) <= THROTTLE_DELIVER_INTERVAL) {
+ Dispatcher::ThrottleInfo tinfo = {0, 0, 0};
+ msgr->ms_deliver_throttle(ms_throttle_t::NONE, tinfo);
+ }
+ }
}
throttle_stamp = ceph_clock_now();
return out << n;
}
+enum class ms_throttle_t {
+ MESSAGE,
+ BYTES,
+ DISPATCH_QUEUE,
+ NONE
+};
+
#endif
return true;
}
+bool OSD::ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) {
+ switch (ttype) {
+ case ms_throttle_t::MESSAGE:
+ break; // TODO
+ case ms_throttle_t::BYTES:
+ break; // TODO
+ case ms_throttle_t::DISPATCH_QUEUE:
+ {
+ //save the latest throttled time, save the number of messages throttled.
+ std::lock_guard l(dispatch_queue_throttle_lock);
+ last_throttled.store(ceph::coarse_mono_clock::now());
+ messages_throttled.store(tinfo.failedrequests);
+ }
+ break;
+ case ms_throttle_t::NONE:
+ {
+ //No Throttling
+ std::lock_guard l(dispatch_queue_throttle_lock);
+ if (last_throttled.load() != ceph::coarse_mono_clock::zero()) {
+ //Don't be hurry to reset last_throttled. Give get_health_metrics()
+ //THROTTLE_STATUS_INTERVAL seconds to read and display the previous status.
+ if (std::chrono::duration_cast<std::chrono::seconds>
+ (ceph::coarse_mono_clock::now() - last_throttled) >= THROTTLE_STATUS_INTERVAL) {
+ last_throttled.store(ceph::coarse_mono_clock::zero());
+ messages_throttled.store(0);
+ }
+ }
+ }
+ break;
+ default:
+ return false;
+ }
+ return true;
+}
+
struct CB_OSD_GetVersion {
OSD *osd;
explicit CB_OSD_GetVersion(OSD *o) : osd(o) {}
}
metrics.emplace_back(daemon_metric::PENDING_CREATING_PGS, n_primaries);
}
+ {
+ std::lock_guard l(dispatch_queue_throttle_lock);
+ if (messages_throttled.load() > 0) {
+ stringstream ss;
+ ss << "Dispatch Queue Throttling, " << messages_throttled.load() << " messages throttled.";
+ lgeneric_subdout(cct,osd,1) << ss.str() << dendl;
+ clog->warn() << ss.str();
+ metrics.emplace_back(daemon_metric::DISPATCH_QUEUE_THROTTLE, messages_throttled.load());
+ }
+ }
return metrics;
}
private:
std::atomic<int> state{STATE_INITIALIZING};
+ std::atomic<ceph::coarse_mono_time> last_throttled {ceph::coarse_mono_clock::zero()};
+ std::atomic<int64_t> messages_throttled {0};
+ const std::chrono::seconds THROTTLE_STATUS_INTERVAL {std::chrono::seconds(10)};
public:
int get_state() const {
std::atomic<size_t> num_pgs = {0};
std::mutex pending_creates_lock;
+ std::mutex dispatch_queue_throttle_lock;
using create_from_osd_t = std::pair<spg_t, bool /* is primary*/>;
std::set<create_from_osd_t> pending_creates_from_osd;
unsigned pending_creates_from_mon = 0;
bool ms_handle_reset(Connection *con) override;
void ms_handle_remote_reset(Connection *con) override {}
bool ms_handle_refused(Connection *con) override;
+ bool ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) override;
public:
/* internal and external can point to the same messenger, they will still