log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
op_tracker(cct, m->cct->_conf->mds_enable_op_tracker),
finisher(cct),
- sessionmap(this), asok_hook(NULL) {
+ sessionmap(this),
+ progress_thread(this),
+ asok_hook(NULL)
+{
hb = cct->get_heartbeat_map()->add_worker("MDS");
// schedule tick
reset_tick();
+ // Start handler for finished_queue
+ progress_thread.create();
+
create_logger();
set_up_admin_socket();
g_conf->add_observer(this);
if (beacon.is_laggy()) {
dout(5) << "tick bailing out since we seem laggy" << dendl;
return;
+ } else {
+ // Wake up thread in case we use to be laggy and have waiting_for_nolaggy
+ // messages to progress.
+ progress_thread.signal();
}
// make sure mds log flushes, trims periodically
op_tracker.on_shutdown();
+ progress_thread.shutdown();
+
// shut down messenger
messenger->shutdown();
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_MDS);
handle_mds_map(static_cast<MMDSMap*>(m));
break;
- case MSG_MDS_BEACON:
- ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
- // no-op, Beacon handles this on our behalf but we listen for the
- // message to get the side effect of handling finished_queue and
- // waiting_for_nolaggy at end of MDS dispatch.
- m->put();
- break;
// misc
case MSG_MON_COMMAND:
return false;
}
-/* If this function returns true, it has put the message. If it returns false,
- * it has not put the message. */
-bool MDS::_dispatch(Message *m)
+/**
+ * Advance finished_queue and waiting_for_nolaggy.
+ *
+ * Usually drain both queues, but may not drain waiting_for_nolaggy
+ * if beacon is currently laggy.
+ */
+void MDS::_advance_queues()
{
- if (is_stale_message(m)) {
- m->put();
- return true;
- }
+ assert(mds_lock.is_locked_by_me());
- // core
- if (!handle_core_message(m)) {
- if (beacon.is_laggy()) {
- dout(10) << " laggy, deferring " << *m << dendl;
- waiting_for_nolaggy.push_back(m);
- } else {
- if (!handle_deferrable_message(m)) {
- dout(0) << "unrecognized message " << *m << dendl;
- m->put();
- return false;
- }
- }
- }
-
- if (dispatch_depth > 1)
- return true;
-
- // finish any triggered contexts
while (!finished_queue.empty()) {
dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
dout(10) << finished_queue << dendl;
}
while (!waiting_for_nolaggy.empty()) {
-
// stop if we're laggy now!
if (beacon.is_laggy())
- return true;
+ break;
Message *old = waiting_for_nolaggy.front();
waiting_for_nolaggy.pop_front();
heartbeat_reset();
}
+}
+
+/* If this function returns true, it has put the message. If it returns false,
+ * it has not put the message. */
+bool MDS::_dispatch(Message *m)
+{
+ if (is_stale_message(m)) {
+ m->put();
+ return true;
+ }
+
+ // core
+ if (!handle_core_message(m)) {
+ if (beacon.is_laggy()) {
+ dout(10) << " laggy, deferring " << *m << dendl;
+ waiting_for_nolaggy.push_back(m);
+ } else {
+ if (!handle_deferrable_message(m)) {
+ dout(0) << "unrecognized message " << *m << dendl;
+ m->put();
+ return false;
+ }
+ }
+ }
+
+ if (dispatch_depth > 1)
+ return true;
+
+ // finish any triggered contexts
+ _advance_queues();
+
+ if (beacon.is_laggy()) {
+ // We've gone laggy during dispatch, don't do any
+ // more housekeeping
+ return true;
+ }
// done with all client replayed requests?
if (is_clientreplay() &&
cct->get_heartbeat_map()->reset_timeout(hb, g_conf->mds_beacon_grace, 0);
}
+
+void *MDS::ProgressThread::entry()
+{
+ Mutex::Locker l(mds->mds_lock);
+ while (true) {
+ while (!stopping && (mds->finished_queue.empty() && mds->waiting_for_nolaggy.empty())) {
+ cond.Wait(mds->mds_lock);
+ }
+
+ if (stopping) {
+ break;
+ }
+
+ mds->_advance_queues();
+ }
+
+ return NULL;
+}
+
+
+void MDS::ProgressThread::shutdown()
+{
+ assert(mds->mds_lock.is_locked_by_me());
+
+ stopping = true;
+ cond.Signal();
+ mds->mds_lock.Unlock();
+ join();
+ mds->mds_lock.Lock();
+}
// -- waiters --
+private:
list<MDSInternalContextBase*> finished_queue;
+ void _advance_queues();
+public:
void queue_waiter(MDSInternalContextBase *c) {
finished_queue.push_back(c);
+ progress_thread.signal();
}
void queue_waiters(list<MDSInternalContextBase*>& ls) {
finished_queue.splice( finished_queue.end(), ls );
+ progress_thread.signal();
}
bool queue_one_replay() {
if (replay_queue.empty())
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
+private:
+ class ProgressThread : public Thread {
+ MDS *mds;
+ bool stopping;
+ Cond cond;
+ public:
+ ProgressThread(MDS *mds_) : mds(mds_), stopping(false) {}
+ void * entry();
+ void shutdown();
+ void signal() {cond.Signal();}
+ } progress_thread;
+ void _progress_thread();
+
public:
MDS(const std::string &n, Messenger *m, MonClient *mc);
~MDS();