From 9ee801524f729dc0e6a544b4b38befbcdd93ff39 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 27 Aug 2014 22:36:10 +0100 Subject: [PATCH] mds: add thread to progress queues outside dispatch This speeds up processing of queued waiters. Fixes: #9252 Signed-off-by: John Spray --- src/mds/Beacon.cc | 3 +- src/mds/MDS.cc | 123 +++++++++++++++++++++++++++++++++------------- src/mds/MDS.h | 18 +++++++ 3 files changed, 107 insertions(+), 37 deletions(-) diff --git a/src/mds/Beacon.cc b/src/mds/Beacon.cc index 83a7efd1b1a8c..7ccbe0e2aa4ce 100644 --- a/src/mds/Beacon.cc +++ b/src/mds/Beacon.cc @@ -79,10 +79,9 @@ bool Beacon::ms_dispatch(Message *m) if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) { handle_mds_beacon(static_cast(m)); } + return true; } - // Let message fall through to MDS so that we get the execution of - // his finished_queue and waiting_for_nolaggy stuff. return false; } diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 53a8cfd42baca..2c286db0d6ab2 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -104,7 +104,10 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) : log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS), op_tracker(cct, m->cct->_conf->mds_enable_op_tracker), finisher(cct), - sessionmap(this), asok_hook(NULL) { + sessionmap(this), + progress_thread(this), + asok_hook(NULL) +{ hb = cct->get_heartbeat_map()->add_worker("MDS"); @@ -683,6 +686,9 @@ int MDS::init(MDSMap::DaemonState wanted_state) // schedule tick reset_tick(); + // Start handler for finished_queue + progress_thread.create(); + create_logger(); set_up_admin_socket(); g_conf->add_observer(this); @@ -714,6 +720,10 @@ void MDS::tick() if (beacon.is_laggy()) { dout(5) << "tick bailing out since we seem laggy" << dendl; return; + } else { + // Wake up thread in case we use to be laggy and have waiting_for_nolaggy + // messages to progress. + progress_thread.signal(); } // make sure mds log flushes, trims periodically @@ -1756,6 +1766,8 @@ void MDS::suicide() op_tracker.on_shutdown(); + progress_thread.shutdown(); + // shut down messenger messenger->shutdown(); @@ -1876,13 +1888,6 @@ bool MDS::handle_core_message(Message *m) ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_MDS); handle_mds_map(static_cast(m)); break; - case MSG_MDS_BEACON: - ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON); - // no-op, Beacon handles this on our behalf but we listen for the - // message to get the side effect of handling finished_queue and - // waiting_for_nolaggy at end of MDS dispatch. - m->put(); - break; // misc case MSG_MON_COMMAND: @@ -2002,33 +2007,16 @@ bool MDS::is_stale_message(Message *m) return false; } -/* If this function returns true, it has put the message. If it returns false, - * it has not put the message. */ -bool MDS::_dispatch(Message *m) +/** + * Advance finished_queue and waiting_for_nolaggy. + * + * Usually drain both queues, but may not drain waiting_for_nolaggy + * if beacon is currently laggy. + */ +void MDS::_advance_queues() { - if (is_stale_message(m)) { - m->put(); - return true; - } + assert(mds_lock.is_locked_by_me()); - // core - if (!handle_core_message(m)) { - if (beacon.is_laggy()) { - dout(10) << " laggy, deferring " << *m << dendl; - waiting_for_nolaggy.push_back(m); - } else { - if (!handle_deferrable_message(m)) { - dout(0) << "unrecognized message " << *m << dendl; - m->put(); - return false; - } - } - } - - if (dispatch_depth > 1) - return true; - - // finish any triggered contexts while (!finished_queue.empty()) { dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl; dout(10) << finished_queue << dendl; @@ -2048,10 +2036,9 @@ bool MDS::_dispatch(Message *m) } while (!waiting_for_nolaggy.empty()) { - // stop if we're laggy now! if (beacon.is_laggy()) - return true; + break; Message *old = waiting_for_nolaggy.front(); waiting_for_nolaggy.pop_front(); @@ -2065,6 +2052,42 @@ bool MDS::_dispatch(Message *m) heartbeat_reset(); } +} + +/* If this function returns true, it has put the message. If it returns false, + * it has not put the message. */ +bool MDS::_dispatch(Message *m) +{ + if (is_stale_message(m)) { + m->put(); + return true; + } + + // core + if (!handle_core_message(m)) { + if (beacon.is_laggy()) { + dout(10) << " laggy, deferring " << *m << dendl; + waiting_for_nolaggy.push_back(m); + } else { + if (!handle_deferrable_message(m)) { + dout(0) << "unrecognized message " << *m << dendl; + m->put(); + return false; + } + } + } + + if (dispatch_depth > 1) + return true; + + // finish any triggered contexts + _advance_queues(); + + if (beacon.is_laggy()) { + // We've gone laggy during dispatch, don't do any + // more housekeeping + return true; + } // done with all client replayed requests? if (is_clientreplay() && @@ -2358,3 +2381,33 @@ void MDS::heartbeat_reset() cct->get_heartbeat_map()->reset_timeout(hb, g_conf->mds_beacon_grace, 0); } + +void *MDS::ProgressThread::entry() +{ + Mutex::Locker l(mds->mds_lock); + while (true) { + while (!stopping && (mds->finished_queue.empty() && mds->waiting_for_nolaggy.empty())) { + cond.Wait(mds->mds_lock); + } + + if (stopping) { + break; + } + + mds->_advance_queues(); + } + + return NULL; +} + + +void MDS::ProgressThread::shutdown() +{ + assert(mds->mds_lock.is_locked_by_me()); + + stopping = true; + cond.Signal(); + mds->mds_lock.Unlock(); + join(); + mds->mds_lock.Lock(); +} diff --git a/src/mds/MDS.h b/src/mds/MDS.h index 6a8bd795bce31..b77f60a29fd5f 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -263,13 +263,18 @@ class MDS : public Dispatcher, public md_config_obs_t { // -- waiters -- +private: list finished_queue; + void _advance_queues(); +public: void queue_waiter(MDSInternalContextBase *c) { finished_queue.push_back(c); + progress_thread.signal(); } void queue_waiters(list& ls) { finished_queue.splice( finished_queue.end(), ls ); + progress_thread.signal(); } bool queue_one_replay() { if (replay_queue.empty()) @@ -319,6 +324,19 @@ class MDS : public Dispatcher, public md_config_obs_t { bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con); +private: + class ProgressThread : public Thread { + MDS *mds; + bool stopping; + Cond cond; + public: + ProgressThread(MDS *mds_) : mds(mds_), stopping(false) {} + void * entry(); + void shutdown(); + void signal() {cond.Signal();} + } progress_thread; + void _progress_thread(); + public: MDS(const std::string &n, Messenger *m, MonClient *mc); ~MDS(); -- 2.39.5