- has not responded to cap revoke by MDS for over
- MDS_CLIENT_LATE_RELEASE
- responding to mclientcaps
- - Throttler Limit has been hit. Some message processing may be significantly delayed.
ceph:
log-ignorelist:
- Replacing daemon mds
- - Throttler Limit has been hit. Some message processing may be significantly delayed.
log-ignorelist:
- overall HEALTH_
- \(MON_DOWN\)
- - Throttler Limit has been hit. Some message processing may be significantly delayed.
tasks:
- mon_thrash:
check_mds_failover: True
ceph:
log-ignorelist:
- Replacing daemon mds
- - Throttler Limit has been hit. Some message processing may be significantly delayed.
log-ignorelist:
- overall HEALTH_
- \(MON_DOWN\)
- - Throttler Limit has been hit. Some message processing may be significantly delayed.
tasks:
- mon_thrash:
check_mds_failover: True
- but it is still running
- objects unfound and apparently lost
- MDS_SLOW_METADATA_IO
- - Throttler Limit has been hit. Some message processing may be significantly delayed.
tasks:
- thrashosds:
info = self.fs.mds_asok(['dump', 'inode', hex(ino)])
assert info['path'] == "/foo"
- def test_dispatch_queue_throttle_message(self):
- """
- That cluster log a warning when the Dispatch Queue Throttle Limit hits
- """
- self.config_set('mds', 'ms_dispatch_throttle_log_interval', 5)
- self.config_set('mds', 'ms_dispatch_throttle_bytes', 10240)
-
- # Create files & split across 10 directories, 1000 each.
- with self.assert_cluster_log("Throttler Limit has been hit. Some message processing may be significantly delayed.",
- invert_match=False, watch_channel="cluster"):
- for i in range(0, 10):
- self.mount_a.create_n_files("dir{0}/file".format(i), 1000, sync=False)
class TestCacheDrop(CephFSTestCase):
CLIENTS_REQUIRED = 1
.set_default(100_M)
.set_description("Limit messages that are read off the network but still being processed"),
- Option("ms_dispatch_throttle_log_interval", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
- .set_default(30)
- .set_min(0)
- .set_description("Interval in seconds for high verbosity debug log message when the dispatch throttle limit are hit"),
-
Option("ms_bind_ipv4", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default(true)
.set_description("Bind servers to IPv4 address(es)")
initialized = true;
- cct->_conf.add_observer(this);
messenger->set_auth_client(this);
messenger->add_dispatcher_head(this);
if (initialized) {
initialized = false;
}
- cct->_conf.remove_observer(this);
monc_lock.lock();
timer.shutdown();
stopping = false;
}
}
-bool MonClient::ms_handle_throttle(ms_throttle_t ttype) {
- switch (ttype) {
- case ms_throttle_t::MESSAGE:
- break; // TODO
- case ms_throttle_t::BYTES:
- break; // TODO
- case ms_throttle_t::DISPATCH_QUEUE:
- {
- //cluster log a warning that Dispatch Queue Throttle Limit hit
- if (!log_client) {
- return false; //cannot handle if the daemon didn't setup a log_client for me
- }
- LogChannelRef clog = log_client->create_channel(CLOG_CHANNEL_CLUSTER);
- clog->warn() << "Throttler Limit has been hit. "
- << "Some message processing may be significantly delayed.";
- }
- break;
- default:
- return false;
- }
- return true;
-}
-
bool MonClient::_opened() const
{
ceph_assert(ceph_mutex_is_locked(monc_lock));
return -EACCES;
}
-const char** MonClient::get_tracked_conf_keys() const {
- static const char* KEYS[] = {
- "ms_dispatch_throttle_bytes",
- "ms_dispatch_throttle_log_interval",
- NULL
- };
- return KEYS;
-}
-
-void MonClient::handle_conf_change(const ConfigProxy& conf, const std::set<std::string> &changed) {
- if (changed.count("ms_dispatch_throttle_bytes") || changed.count("ms_dispatch_throttle_log_interval")) {
- if (messenger) {
- messenger->dispatch_throttle_bytes = cct->_conf.get_val<Option::size_t>("ms_dispatch_throttle_bytes");
- messenger->dispatch_throttle_log_interval = cct->_conf.get_val<std::chrono::seconds>("ms_dispatch_throttle_log_interval");
- }
- }
-}
-
AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
std::lock_guard l(monc_lock);
if (auth) {
const boost::system::error_category& monc_category() noexcept;
class MonClient : public Dispatcher,
- public AuthClient,
- public AuthServer, /* for mgr, osd, mds */
- public md_config_obs_t {
+ public AuthClient,
+ public AuthServer /* for mgr, osd, mds */ {
static constexpr auto dout_subsys = ceph_subsys_monc;
public:
// Error, Newest, Oldest
bool ms_handle_reset(Connection *con) override;
void ms_handle_remote_reset(Connection *con) override {}
bool ms_handle_refused(Connection *con) override { return false; }
- bool ms_handle_throttle(ms_throttle_t ttype) override;
void handle_monmap(MMonMap *m);
void handle_config(MConfig *m);
uint32_t auth_method,
const ceph::buffer::list& bl,
ceph::buffer::list *reply) override;
- // md_config_obs_t (config observer)
- const char** get_tracked_conf_keys() const override;
- void handle_conf_change(
- const ConfigProxy& conf,
- const std::set<std::string> &changed) override;
void set_entity_name(EntityName name) { entity_name = name; }
void set_handle_authentication_dispatcher(Dispatcher *d) {
uint64_t get_id() {
return next_id++;
}
-
- Messenger* get_messenger() const {
- return msgr;
- }
-
void start();
void entry();
void wait();
return 0;
}
- /**
- * handle throttle limit hit and cluster log it.
- *
- * return true if handled
- * return false if not handled
- */
- virtual bool ms_handle_throttle(ms_throttle_t ttype) {
- return false;
- }
-
/**
* @} //Authentication
*/
auth_registry(cct)
{
auth_registry.refresh_config();
- dispatch_throttle_bytes = cct->_conf.get_val<Option::size_t>("ms_dispatch_throttle_bytes");
- dispatch_throttle_log_interval = cct->_conf.get_val<std::chrono::seconds>("ms_dispatch_throttle_log_interval");
}
void Messenger::set_endpoint_addr(const entity_addr_t& a,
public:
AuthClient *auth_client = 0;
AuthServer *auth_server = 0;
- uint64_t dispatch_throttle_bytes;
- std::chrono::seconds dispatch_throttle_log_interval;
#ifdef UNIT_TESTS_BUILT
Interceptor *interceptor = nullptr;
void set_require_authorizer(bool b) {
require_authorizer = b;
}
- /**
- * Notify each Dispatcher that the Throttle Limit has been hit. Call
- * this function whenever the connections are getting throttled.
- *
- * @param ttype Throttle type
- */
- void ms_deliver_throttle(ms_throttle_t ttype) {
- for (const auto &dispatcher : dispatchers) {
- if (dispatcher->ms_handle_throttle(ttype))
- return;
- }
- }
/**
* @} // Dispatcher Interfacing
AsyncConnection *connection;
AsyncMessenger *messenger;
CephContext *cct;
- ceph::mono_time throttle_prev = ceph::mono_clock::zero();
public:
std::shared_ptr<AuthConnectionMeta> auth_meta;
ldout(cct, 20) << __func__ << dendl;
if (cur_msg_size) {
- Messenger* msgr = connection->dispatch_queue->get_messenger();
- //update max if it's changed in the conf. Expecting qa tests would change ms_dispatch_throttle_bytes.
- connection->dispatch_queue->dispatch_throttler.reset_max(msgr->dispatch_throttle_bytes);
-
if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
cur_msg_size)) {
ldout(cct, 10)
<< connection->dispatch_queue->dispatch_throttler.get_current() << "/"
<< connection->dispatch_queue->dispatch_throttler.get_max()
<< " failed, just wait." << dendl;
- ceph::mono_time throttle_now = ceph::mono_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::seconds>(throttle_now - throttle_prev);
- if (duration >= msgr->dispatch_throttle_log_interval) {
- ldout(cct, 1) << __func__ << " Throttler Limit has been hit. "
- << "Some message processing may be significantly delayed." << dendl;
- throttle_prev = throttle_now;
-
- //Cluster logging that throttling is occurring.
- msgr->ms_deliver_throttle(ms_throttle_t::DISPATCH_QUEUE);
- }
// following thread pool deal with th full message queue isn't a
// short time, so we can wait a ms.
if (connection->register_time_events.empty()) {
const size_t cur_msg_size = get_current_msg_size();
if (cur_msg_size) {
- Messenger* msgr = connection->dispatch_queue->get_messenger();
- //update max if it's changed in the conf. Expecting qa tests would change ms_dispatch_throttle_bytes.
- connection->dispatch_queue->dispatch_throttler.reset_max(msgr->dispatch_throttle_bytes);
-
if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
cur_msg_size)) {
ldout(cct, 10)
<< connection->dispatch_queue->dispatch_throttler.get_current() << "/"
<< connection->dispatch_queue->dispatch_throttler.get_max()
<< " failed, just wait." << dendl;
- ceph::mono_time throttle_now = ceph::mono_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::seconds>(throttle_now - throttle_prev);
- if (duration >= msgr->dispatch_throttle_log_interval) {
- ldout(cct, 1) << __func__ << " Throttler Limit has been hit. "
- << "Some message processing may be significantly delayed." << dendl;
- throttle_prev = throttle_now;
-
- //Cluster logging that throttling is occurring.
- msgr->ms_deliver_throttle(ms_throttle_t::DISPATCH_QUEUE);
- }
// following thread pool deal with th full message queue isn't a
// short time, so we can wait a ms.
if (connection->register_time_events.empty()) {
return out << n;
}
-enum class ms_throttle_t {
- MESSAGE,
- BYTES,
- DISPATCH_QUEUE
-};
-
#endif