.set_default(100_M)
.set_description("Limit messages that are read off the network but still being processed"),
+ Option("ms_dispatch_throttle_log_interval", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
+ .set_default(30)
+ .set_min(0)
+ .set_description("Interval in seconds for high verbosity debug log message when the dispatch throttle limit are hit"),
+
Option("ms_bind_ipv4", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default(true)
.set_description("Bind servers to IPv4 address(es)")
initialized = true;
+ cct->_conf.add_observer(this);
messenger->set_auth_client(this);
messenger->add_dispatcher_head(this);
if (initialized) {
initialized = false;
}
+ cct->_conf.remove_observer(this);
monc_lock.lock();
timer.shutdown();
stopping = false;
}
}
+bool MonClient::ms_handle_throttle(ms_throttle_t ttype) {
+ switch (ttype) {
+ case ms_throttle_t::MESSAGE:
+ break; // TODO
+ case ms_throttle_t::BYTES:
+ break; // TODO
+ case ms_throttle_t::DISPATCH_QUEUE:
+ {
+ //cluster log a warning that Dispatch Queue Throttle Limit hit
+ if (!log_client) {
+ return false; //cannot handle if the daemon didn't setup a log_client for me
+ }
+ LogChannelRef clog = log_client->create_channel(CLOG_CHANNEL_CLUSTER);
+ clog->warn() << "Throttler Limit has been hit. "
+ << "Some message processing may be significantly delayed.";
+ }
+ break;
+ default:
+ return false;
+ }
+ return true;
+}
+
bool MonClient::_opened() const
{
ceph_assert(ceph_mutex_is_locked(monc_lock));
return -EACCES;
}
+const char** MonClient::get_tracked_conf_keys() const {
+ static const char* KEYS[] = {
+ "ms_dispatch_throttle_bytes",
+ "ms_dispatch_throttle_log_interval",
+ NULL
+ };
+ return KEYS;
+}
+
+void MonClient::handle_conf_change(const ConfigProxy& conf, const std::set<std::string> &changed) {
+ if (changed.count("ms_dispatch_throttle_bytes") || changed.count("ms_dispatch_throttle_log_interval")) {
+ if (messenger) {
+ messenger->dispatch_throttle_bytes = cct->_conf.get_val<Option::size_t>("ms_dispatch_throttle_bytes");
+ messenger->dispatch_throttle_log_interval = cct->_conf.get_val<std::chrono::seconds>("ms_dispatch_throttle_log_interval");
+ }
+ }
+}
+
AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
std::lock_guard l(monc_lock);
if (auth) {
const boost::system::error_category& monc_category() noexcept;
class MonClient : public Dispatcher,
- public AuthClient,
- public AuthServer /* for mgr, osd, mds */ {
+ public AuthClient,
+ public AuthServer, /* for mgr, osd, mds */
+ public md_config_obs_t {
static constexpr auto dout_subsys = ceph_subsys_monc;
public:
// Error, Newest, Oldest
bool ms_handle_reset(Connection *con) override;
void ms_handle_remote_reset(Connection *con) override {}
bool ms_handle_refused(Connection *con) override { return false; }
+ bool ms_handle_throttle(ms_throttle_t ttype) override;
void handle_monmap(MMonMap *m);
void handle_config(MConfig *m);
uint32_t auth_method,
const ceph::buffer::list& bl,
ceph::buffer::list *reply) override;
+ // md_config_obs_t (config observer)
+ const char** get_tracked_conf_keys() const override;
+ void handle_conf_change(
+ const ConfigProxy& conf,
+ const std::set<std::string> &changed) override;
void set_entity_name(EntityName name) { entity_name = name; }
void set_handle_authentication_dispatcher(Dispatcher *d) {
uint64_t get_id() {
return next_id++;
}
+
+ Messenger* get_messenger() const {
+ return msgr;
+ }
+
void start();
void entry();
void wait();
return 0;
}
+ /**
+ * handle throttle limit hit and cluster log it.
+ *
+ * return true if handled
+ * return false if not handled
+ */
+ virtual bool ms_handle_throttle(ms_throttle_t ttype) {
+ return false;
+ }
+
/**
* @} //Authentication
*/
auth_registry(cct)
{
auth_registry.refresh_config();
+ dispatch_throttle_bytes = cct->_conf.get_val<Option::size_t>("ms_dispatch_throttle_bytes");
+ dispatch_throttle_log_interval = cct->_conf.get_val<std::chrono::seconds>("ms_dispatch_throttle_log_interval");
}
void Messenger::set_endpoint_addr(const entity_addr_t& a,
public:
AuthClient *auth_client = 0;
AuthServer *auth_server = 0;
+ uint64_t dispatch_throttle_bytes;
+ std::chrono::seconds dispatch_throttle_log_interval;
#ifdef UNIT_TESTS_BUILT
Interceptor *interceptor = nullptr;
void set_require_authorizer(bool b) {
require_authorizer = b;
}
+ /**
+ * Notify each Dispatcher that the Throttle Limit has been hit. Call
+ * this function whenever the connections are getting throttled.
+ *
+ * @param ttype Throttle type
+ */
+ void ms_deliver_throttle(ms_throttle_t ttype) {
+ for (const auto &dispatcher : dispatchers) {
+ if (dispatcher->ms_handle_throttle(ttype))
+ return;
+ }
+ }
/**
* @} // Dispatcher Interfacing
AsyncConnection *connection;
AsyncMessenger *messenger;
CephContext *cct;
+ ceph::mono_time throttle_prev = ceph::mono_clock::zero();
public:
std::shared_ptr<AuthConnectionMeta> auth_meta;
ldout(cct, 20) << __func__ << dendl;
if (cur_msg_size) {
+ Messenger* msgr = connection->dispatch_queue->get_messenger();
+ //update max if it's changed in the conf. Expecting qa tests would change ms_dispatch_throttle_bytes.
+ connection->dispatch_queue->dispatch_throttler.reset_max(msgr->dispatch_throttle_bytes);
+
if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
cur_msg_size)) {
ldout(cct, 10)
<< connection->dispatch_queue->dispatch_throttler.get_current() << "/"
<< connection->dispatch_queue->dispatch_throttler.get_max()
<< " failed, just wait." << dendl;
+ ceph::mono_time throttle_now = ceph::mono_clock::now();
+ auto duration = std::chrono::duration_cast<std::chrono::seconds>(throttle_now - throttle_prev);
+ if (duration >= msgr->dispatch_throttle_log_interval) {
+ ldout(cct, 1) << __func__ << " Throttler Limit has been hit. "
+ << "Some message processing may be significantly delayed." << dendl;
+ throttle_prev = throttle_now;
+
+ //Cluster logging that throttling is occurring.
+ msgr->ms_deliver_throttle(ms_throttle_t::DISPATCH_QUEUE);
+ }
// following thread pool deal with th full message queue isn't a
// short time, so we can wait a ms.
if (connection->register_time_events.empty()) {
const size_t cur_msg_size = get_current_msg_size();
if (cur_msg_size) {
+ Messenger* msgr = connection->dispatch_queue->get_messenger();
+ //update max if it's changed in the conf. Expecting qa tests would change ms_dispatch_throttle_bytes.
+ connection->dispatch_queue->dispatch_throttler.reset_max(msgr->dispatch_throttle_bytes);
+
if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
cur_msg_size)) {
ldout(cct, 10)
<< connection->dispatch_queue->dispatch_throttler.get_current() << "/"
<< connection->dispatch_queue->dispatch_throttler.get_max()
<< " failed, just wait." << dendl;
+ ceph::mono_time throttle_now = ceph::mono_clock::now();
+ auto duration = std::chrono::duration_cast<std::chrono::seconds>(throttle_now - throttle_prev);
+ if (duration >= msgr->dispatch_throttle_log_interval) {
+ ldout(cct, 1) << __func__ << " Throttler Limit has been hit. "
+ << "Some message processing may be significantly delayed." << dendl;
+ throttle_prev = throttle_now;
+
+ //Cluster logging that throttling is occurring.
+ msgr->ms_deliver_throttle(ms_throttle_t::DISPATCH_QUEUE);
+ }
// following thread pool deal with th full message queue isn't a
// short time, so we can wait a ms.
if (connection->register_time_events.empty()) {
return out << n;
}
+enum class ms_throttle_t {
+ MESSAGE,
+ BYTES,
+ DISPATCH_QUEUE
+};
+
#endif