From 399cae9a05949035c302ed675ca261e93a35ee02 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 23 Mar 2011 17:17:44 -0700 Subject: [PATCH] mds: reimplement laggy The goal is for the MDS to stop processing requests when it hasn't heard from the monitors, to avoid a situation where a rogue process goes off doing its own thing. Yes, if we fail it over the cmds can't write to the object store, but it can reply to clients when it may not be appropriate or good to do so. The old logic was fragile and wonky, with messages getting deferred, and then re-deferred. This implementation is much cleaner and should be much more efficient and less fragile. There are still improvements to be made as far as which messages we do/do not process when we think we're laggy. Signed-off-by: Sage Weil --- src/mds/MDS.cc | 356 ++++++++++++++++++++++++++----------------------- src/mds/MDS.h | 18 +-- 2 files changed, 200 insertions(+), 174 deletions(-) diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 02160583d74db..7d5d77aa2579d 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -131,7 +131,7 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) : beacon_last_seq = 0; beacon_sender = 0; beacon_killer = 0; - laggy = false; + was_laggy = false; // tick tick_event = 0; @@ -509,7 +509,6 @@ int MDS::init(int wanted_state) // schedule tick reset_tick(); - last_tick = g_clock.now(); open_logger(); @@ -536,18 +535,10 @@ void MDS::tick() clog.send_log(); - utime_t now = g_clock.now(); - utime_t delay = now; - delay -= last_tick; - if (delay > g_conf.mds_session_timeout / 2) { - dout(0) << " last tick was " << delay << " > " << g_conf.mds_tick_interval - << " seconds ago, laggy_until " << laggy_until - << ", setting laggy flag" << dendl; - laggy = true; - } - if (laggy) + if (is_laggy()) { + dout(5) << "tick bailing out since we seem laggy" << dendl; return; - last_tick = now; + } // make sure mds log flushes, trims periodically mdlog->flush(); @@ -560,6 +551,7 @@ void MDS::tick() } // log + utime_t now = g_clock.now(); mds_load_t load = balancer->get_load(now); if (logger) { @@ -629,6 +621,24 @@ void MDS::beacon_send() timer.add_event_after(g_conf.mds_beacon_interval, beacon_sender); } + +bool MDS::is_laggy() +{ + if (beacon_last_acked_stamp == utime_t()) + return false; + + utime_t now = g_clock.now(); + utime_t since = now - beacon_last_acked_stamp; + if (since > g_conf.mds_beacon_grace) { + dout(5) << "is_laggy " << since << " > " << g_conf.mds_beacon_grace + << " since last acked beacon" << dendl; + was_laggy = true; + return true; + } + return false; +} + + /* This fuction puts the passed message before returning */ void MDS::handle_mds_beacon(MMDSBeacon *m) { @@ -645,18 +655,16 @@ void MDS::handle_mds_beacon(MMDSBeacon *m) << " seq " << m->get_seq() << " rtt " << rtt << dendl; + if (was_laggy && rtt < g_conf.mds_beacon_grace) { + dout(0) << "handle_mds_beacon no longer laggy" << dendl; + was_laggy = false; + laggy_until = now; + } + // clean up seq_stamp map while (!beacon_seq_stamp.empty() && beacon_seq_stamp.begin()->first <= seq) beacon_seq_stamp.erase(beacon_seq_stamp.begin()); - - if (laggy && rtt < g_conf.mds_beacon_grace) { - dout(1) << " clearing laggy flag" << dendl; - laggy = false; - laggy_until = now; - last_tick = now; // so that tick() will start up again - queue_waiters(waiting_for_nolaggy); - } reset_beacon_killer(); } else { @@ -685,9 +693,8 @@ void MDS::beacon_kill(utime_t lab) { if (lab == beacon_last_acked_stamp) { dout(0) << "beacon_kill last_acked_stamp " << lab - << ", setting laggy flag." + << ", we are laggy!" << dendl; - laggy = true; //suicide(); } else { dout(20) << "beacon_kill last_acked_stamp " << beacon_last_acked_stamp @@ -1627,13 +1634,6 @@ bool MDS::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool for return *authorizer != NULL; } -/* If this function returns true, it has put the message. If it returns false, - * it has not put the message. */ -bool MDS::_dispatch(Message *m) -{ - bool check_from = false; - - utime_t req_start = g_clock.now(); #define ALLOW_MESSAGES_FROM(peers) \ do { \ @@ -1643,34 +1643,15 @@ do { \ m->put(); \ return true; \ } \ - check_from = true; \ } while (0) - // from bad mds? - if (m->get_source().is_mds()) { - int from = m->get_source().num(); - if (!mdsmap->have_inst(from) || - mdsmap->get_inst(from) != m->get_source_inst() || - mdsmap->is_down(from)) { - // bogus mds? - if (m->get_type() == CEPH_MSG_MDS_MAP) { - dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source() - << ", but it's an mdsmap, looking at it" << dendl; - } else if (m->get_type() == MSG_MDS_CACHEEXPIRE && - mdsmap->get_inst(from) == m->get_source_inst()) { - dout(5) << "got " << *m << " from down mds " << m->get_source() - << ", but it's a cache_expire, looking at it" << dendl; - } else { - dout(5) << "got " << *m << " from down/old/bad/imposter mds " << m->get_source() - << ", dropping" << dendl; - m->put(); - return true; - } - } - } +/* + * high priority messages we always process + */ +bool MDS::handle_core_message(Message *m) +{ switch (m->get_type()) { - case CEPH_MSG_MON_MAP: ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON); m->put(); @@ -1691,137 +1672,182 @@ do { \ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON); handle_command((MMonCommand*)m); break; - + + // OSD + case CEPH_MSG_OSD_OPREPLY: + ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD); + objecter->handle_osd_op_reply((class MOSDOpReply*)m); + break; + case CEPH_MSG_OSD_MAP: + ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD); + objecter->handle_osd_map((MOSDMap*)m); + if (is_active() && snapserver) + snapserver->check_osd_map(true); + break; + default: + return false; + } + return true; +} + +/* + * lower priority messages we defer if we seem laggy + */ +bool MDS::handle_deferrable_message(Message *m) +{ + int port = m->get_type() & 0xff00; + + switch (port) { + case MDS_PORT_CACHE: + ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS); + mdcache->dispatch(m); + break; - if (laggy) { - dout(10) << "laggy, deferring " << *m << dendl; - waiting_for_nolaggy.push_back(new C_MDS_RetryMessage(this, m)); - } else { - int port = m->get_type() & 0xff00; - switch (port) { - case MDS_PORT_CACHE: - ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS); - mdcache->dispatch(m); - break; - - case MDS_PORT_MIGRATOR: - ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS); - mdcache->migrator->dispatch(m); - break; - - default: - switch (m->get_type()) { - // SERVER - case CEPH_MSG_CLIENT_SESSION: - case CEPH_MSG_CLIENT_REQUEST: - case CEPH_MSG_CLIENT_RECONNECT: - ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT); - server->dispatch(m); - break; - case MSG_MDS_SLAVE_REQUEST: - ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS); - server->dispatch(m); - break; - - case MSG_MDS_HEARTBEAT: - ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS); - balancer->proc_message(m); - break; + case MDS_PORT_MIGRATOR: + ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS); + mdcache->migrator->dispatch(m); + break; + + default: + switch (m->get_type()) { + // SERVER + case CEPH_MSG_CLIENT_SESSION: + case CEPH_MSG_CLIENT_REQUEST: + case CEPH_MSG_CLIENT_RECONNECT: + ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT); + server->dispatch(m); + break; + case MSG_MDS_SLAVE_REQUEST: + ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS); + server->dispatch(m); + break; + + case MSG_MDS_HEARTBEAT: + ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS); + balancer->proc_message(m); + break; - case MSG_MDS_TABLE_REQUEST: - ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS); - { - MMDSTableRequest *req = (MMDSTableRequest*)m; - if (req->op < 0) { - MDSTableClient *client = get_table_client(req->table); + case MSG_MDS_TABLE_REQUEST: + ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS); + { + MMDSTableRequest *req = (MMDSTableRequest*)m; + if (req->op < 0) { + MDSTableClient *client = get_table_client(req->table); client->handle_request(req); - } else { - MDSTableServer *server = get_table_server(req->table); - server->handle_request(req); - } - } - break; - - case MSG_MDS_LOCK: - case MSG_MDS_INODEFILECAPS: - ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS); - locker->dispatch(m); - break; - - case CEPH_MSG_CLIENT_CAPS: - case CEPH_MSG_CLIENT_CAPRELEASE: - case CEPH_MSG_CLIENT_LEASE: - ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT); - locker->dispatch(m); - break; - - // OSD - case CEPH_MSG_OSD_OPREPLY: - ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD); - objecter->handle_osd_op_reply((class MOSDOpReply*)m); - break; - case CEPH_MSG_OSD_MAP: - ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD); - objecter->handle_osd_map((MOSDMap*)m); - if (is_active() && snapserver) - snapserver->check_osd_map(true); - break; - - default: - return false; + } else { + MDSTableServer *server = get_table_server(req->table); + server->handle_request(req); } } + break; + + case MSG_MDS_LOCK: + case MSG_MDS_INODEFILECAPS: + ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS); + locker->dispatch(m); + break; + + case CEPH_MSG_CLIENT_CAPS: + case CEPH_MSG_CLIENT_CAPRELEASE: + case CEPH_MSG_CLIENT_LEASE: + ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT); + locker->dispatch(m); + break; + default: + return false; } } - //assert(check_from); - - - if (laggy) - return true; + return true; +} +/* If this function returns true, it has put the message. If it returns false, + * it has not put the message. */ +bool MDS::_dispatch(Message *m) +{ + // from bad mds? + if (m->get_source().is_mds()) { + int from = m->get_source().num(); + if (!mdsmap->have_inst(from) || + mdsmap->get_inst(from) != m->get_source_inst() || + mdsmap->is_down(from)) { + // bogus mds? + if (m->get_type() == CEPH_MSG_MDS_MAP) { + dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source() + << ", but it's an mdsmap, looking at it" << dendl; + } else if (m->get_type() == MSG_MDS_CACHEEXPIRE && + mdsmap->get_inst(from) == m->get_source_inst()) { + dout(5) << "got " << *m << " from down mds " << m->get_source() + << ", but it's a cache_expire, looking at it" << dendl; + } else { + dout(5) << "got " << *m << " from down/old/bad/imposter mds " << m->get_source() + << ", dropping" << dendl; + m->put(); + return true; + } + } + } - // finish any triggered contexts - static bool finishing = false; - if (!finishing) { - while (finished_queue.size()) { - dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl; - dout(10) << finished_queue << dendl; - finishing = true; - list ls; - ls.swap(finished_queue); - while (!ls.empty()) { - dout(10) << " finish " << ls.front() << dendl; - ls.front()->finish(0); - delete ls.front(); - ls.pop_front(); + // core + if (!handle_core_message(m)) { + if (is_laggy()) { + dout(10) << " laggy, deferring " << *m << dendl; + waiting_for_nolaggy.push_back(m); + } else { + if (!handle_deferrable_message(m)) { + dout(0) << "unrecognized message " << *m << dendl; + m->put(); + return false; } - finishing = false; } + } - // done with all client replayed requests? - if (is_clientreplay() && - mdcache->is_open() && - replay_queue.empty() && - want_state == MDSMap::STATE_CLIENTREPLAY) { - dout(10) << " still have " << mdcache->get_num_active_requests() - << " active replay requests" << dendl; - if (mdcache->get_num_active_requests() == 0) - clientreplay_done(); + // finish any triggered contexts + while (finished_queue.size()) { + dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl; + dout(10) << finished_queue << dendl; + list ls; + ls.swap(finished_queue); + while (!ls.empty()) { + dout(10) << " finish " << ls.front() << dendl; + ls.front()->finish(0); + delete ls.front(); + ls.pop_front(); + + // give other threads (beacon!) a chance + mds_lock.Unlock(); + mds_lock.Lock(); } } - utime_t duration = g_clock.now(); - duration -= req_start; - if (duration > g_conf.mds_session_timeout / 2) { - dout(0) << " dispatch took " << duration << " > " << g_conf.mds_tick_interval - << " seconds, setting laggy flag" << dendl; - laggy = true; - return true; + while (!waiting_for_nolaggy.empty()) { + + // stop if we're laggy now! + if (is_laggy()) + return true; + + Message *m = waiting_for_nolaggy.front(); + waiting_for_nolaggy.pop_front(); + dout(7) << " processing laggy deferred " << *m << dendl; + handle_deferrable_message(m); + + // give other threads (beacon!) a chance + mds_lock.Unlock(); + mds_lock.Lock(); } + // done with all client replayed requests? + if (is_clientreplay() && + mdcache->is_open() && + replay_queue.empty() && + want_state == MDSMap::STATE_CLIENTREPLAY) { + dout(10) << " still have " << mdcache->get_num_active_requests() + << " active replay requests" << dendl; + if (mdcache->get_num_active_requests() == 0) + clientreplay_done(); + } // hack: thrash exports static utime_t start; diff --git a/src/mds/MDS.h b/src/mds/MDS.h index 2efe9ab4278ed..86c22a6d15b1e 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -193,7 +193,7 @@ class MDS : public Dispatcher { list waiting_for_active, waiting_for_replay, waiting_for_reconnect; list replay_queue; map > waiting_for_active_peer; - list waiting_for_nolaggy; + list waiting_for_nolaggy; map peer_mdsmap_epoch; @@ -262,11 +262,10 @@ class MDS : public Dispatcher { version_t beacon_last_seq; // last seq sent to monitor map beacon_seq_stamp; // seq # -> time sent utime_t beacon_last_acked_stamp; // last time we sent a beacon that got acked - bool laggy; + bool was_laggy; utime_t laggy_until; - utime_t last_tick; - bool is_laggy() { return laggy; } + bool is_laggy(); utime_t get_laggy_until() { return laggy_until; } class C_MDS_BeaconSender : public Context { @@ -324,7 +323,9 @@ class MDS : public Dispatcher { bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply, bool& isvalid); - + void ms_handle_connect(Connection *con); + bool ms_handle_reset(Connection *con); + void ms_handle_remote_reset(Connection *con); public: MDS(const std::string &n, Messenger *m, MonClient *mc); @@ -394,11 +395,10 @@ class MDS : public Dispatcher { // messages bool _dispatch(Message *m); - - void ms_handle_connect(Connection *con); - bool ms_handle_reset(Connection *con); - void ms_handle_remote_reset(Connection *con); + bool handle_core_message(Message *m); + bool handle_deferrable_message(Message *m); + // special message types void handle_command(class MMonCommand *m); void handle_mds_map(class MMDSMap *m); -- 2.39.5