From f23717fbd923c366bb07751906a42ebf9e01b2d4 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 2 Jun 2009 13:33:07 -0700 Subject: [PATCH] mds: replay all old client requests before handling new requests Adds a new CLIENTREPLAY state between REJOIN and ACTIVE. It it's strictly necessary for anyone to know the MDS is handling it's backlog first, but it doesn't hurt. --- src/include/ceph_fs.h | 30 ++++++++++-------- src/mds/MDCache.h | 2 ++ src/mds/MDS.cc | 60 +++++++++++++++++++++-------------- src/mds/MDS.h | 8 ++++- src/mds/MDSMap.h | 2 ++ src/mds/Server.cc | 33 ++++++++++++------- src/messages/MClientRequest.h | 2 ++ 7 files changed, 88 insertions(+), 49 deletions(-) diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 46ec816d010a9..bfe1c8843b264 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -327,22 +327,23 @@ struct ceph_client_ticket { * > 0 -> in * <= 0 -> out */ -#define CEPH_MDS_STATE_DNE 0 /* down, does not exist. */ -#define CEPH_MDS_STATE_STOPPED -1 /* down, once existed, but no subtrees. - empty log. */ -#define CEPH_MDS_STATE_BOOT -4 /* up, boot announcement. */ -#define CEPH_MDS_STATE_STANDBY -5 /* up, idle. waiting for assignment. */ -#define CEPH_MDS_STATE_CREATING -6 /* up, creating MDS instance. */ -#define CEPH_MDS_STATE_STARTING -7 /* up, starting previously stopped mds. */ +#define CEPH_MDS_STATE_DNE 0 /* down, does not exist. */ +#define CEPH_MDS_STATE_STOPPED -1 /* down, once existed, but no subtrees. + empty log. */ +#define CEPH_MDS_STATE_BOOT -4 /* up, boot announcement. */ +#define CEPH_MDS_STATE_STANDBY -5 /* up, idle. waiting for assignment. */ +#define CEPH_MDS_STATE_CREATING -6 /* up, creating MDS instance. */ +#define CEPH_MDS_STATE_STARTING -7 /* up, starting previously stopped mds. */ #define CEPH_MDS_STATE_STANDBY_REPLAY -8 /* up, tailing active node's journal */ -#define CEPH_MDS_STATE_REPLAY 8 /* up, replaying journal. */ -#define CEPH_MDS_STATE_RESOLVE 9 /* up, disambiguating distributed - operations (import, rename, etc.) */ -#define CEPH_MDS_STATE_RECONNECT 10 /* up, reconnect to clients */ -#define CEPH_MDS_STATE_REJOIN 11 /* up, rejoining distributed cache */ -#define CEPH_MDS_STATE_ACTIVE 12 /* up, active */ -#define CEPH_MDS_STATE_STOPPING 13 /* up, but exporting metadata */ +#define CEPH_MDS_STATE_REPLAY 8 /* up, replaying journal. */ +#define CEPH_MDS_STATE_RESOLVE 9 /* up, disambiguating distributed + operations (import, rename, etc.) */ +#define CEPH_MDS_STATE_RECONNECT 10 /* up, reconnect to clients */ +#define CEPH_MDS_STATE_REJOIN 11 /* up, rejoining distributed cache */ +#define CEPH_MDS_STATE_CLIENTREPLAY 12 /* up, replaying client operations */ +#define CEPH_MDS_STATE_ACTIVE 13 /* up, active */ +#define CEPH_MDS_STATE_STOPPING 14 /* up, but exporting metadata */ static inline const char *ceph_mds_state_name(int s) { @@ -361,6 +362,7 @@ static inline const char *ceph_mds_state_name(int s) case CEPH_MDS_STATE_RESOLVE: return "up:resolve"; case CEPH_MDS_STATE_RECONNECT: return "up:reconnect"; case CEPH_MDS_STATE_REJOIN: return "up:rejoin"; + case CEPH_MDS_STATE_CLIENTREPLAY: return "up:clientreplay"; case CEPH_MDS_STATE_ACTIVE: return "up:active"; case CEPH_MDS_STATE_STOPPING: return "up:stopping"; default: return ""; diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index e7ade93d918a8..32ef9bc644dcf 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -529,6 +529,8 @@ protected: hash_map active_requests; public: + int get_num_active_requests() { return active_requests.size(); } + MDRequest* request_start(MClientRequest *req); MDRequest* request_start_slave(metareqid_t rid, int by); MDRequest* request_start_internal(int op); diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 30fb42b0b083a..5f282ca51c15b 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -630,12 +630,13 @@ void MDS::handle_mds_map(MMDSMap *m) << ceph_mds_state_name(state) << dendl; want_state = state; - // now active? + // did i just recover? + if ((is_active() || is_clientreplay()) && + (oldstate == MDSMap::STATE_REJOIN || + oldstate == MDSMap::STATE_RECONNECT)) + recovery_done(); + if (is_active()) { - // did i just recover? - if (oldstate == MDSMap::STATE_REJOIN || - oldstate == MDSMap::STATE_RECONNECT) - recovery_done(); finish_contexts(waiting_for_active); // kick waiters } else if (is_replay() || is_standby_replay()) { replay_start(); @@ -643,6 +644,8 @@ void MDS::handle_mds_map(MMDSMap *m) resolve_start(); } else if (is_reconnect()) { reconnect_start(); + } else if (is_clientreplay()) { + clientreplay_start(); } else if (is_creating()) { boot_create(); } else if (is_starting()) { @@ -966,19 +969,6 @@ void MDS::reconnect_done() request_state(MDSMap::STATE_REJOIN); // move to rejoin state mdcache->reconnect_clean_open_file_lists(); - - /* - if (mdsmap->get_num_in_mds() == 1 && - mdsmap->get_num_mds(MDSMap::STATE_FAILED) == 0) { // just me! - - // finish processing caps (normally, this happens during rejoin, but we're skipping that...) - mdcache->rejoin_gather_finish(); - - request_state(MDSMap::STATE_ACTIVE); // go active - } else { - request_state(MDSMap::STATE_REJOIN); // move to rejoin state - } - */ } void MDS::rejoin_joint_start() @@ -991,6 +981,22 @@ void MDS::rejoin_done() dout(1) << "rejoin_done" << dendl; mdcache->show_subtrees(); mdcache->show_cache(); + + if (waiting_for_replay.empty()) + request_state(MDSMap::STATE_ACTIVE); + else + request_state(MDSMap::STATE_CLIENTREPLAY); +} + +void MDS::clientreplay_start() +{ + dout(1) << "clientreplay_start" << dendl; + queue_waiters(waiting_for_replay); +} + +void MDS::clientreplay_done() +{ + dout(1) << "clientreplay_done" << dendl; request_state(MDSMap::STATE_ACTIVE); } @@ -998,7 +1004,7 @@ void MDS::rejoin_done() void MDS::recovery_done() { dout(1) << "recovery_done -- successful recovery!" << dendl; - assert(is_active()); + assert(is_active() || is_clientreplay()); // kick anchortable (resent AGREEs) if (mdsmap->get_tableserver() == whoami) { @@ -1017,8 +1023,6 @@ void MDS::recovery_done() bcast_mds_map(); mdcache->populate_mydir(); - - queue_waiters(waiting_for_active); } void MDS::handle_mds_recovery(int who) @@ -1258,16 +1262,26 @@ bool MDS::_dispatch(Message *m) // finish any triggered contexts - if (finished_queue.size()) { + static bool finishing = false; + if (!finishing && finished_queue.size()) { dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl; dout(10) << finished_queue << dendl; list ls; ls.splice(ls.begin(), finished_queue); assert(finished_queue.empty()); + finishing = true; finish_contexts(ls); + finishing = false; + } else { + // done with all client replayed requests? + if (!finishing && + is_clientreplay() && + mdcache->is_open() && + mdcache->get_num_active_requests() == 0 && + want_state == MDSMap::STATE_CLIENTREPLAY) + clientreplay_done(); } - // hack: thrash exports static utime_t start; utime_t now = g_clock.now(); diff --git a/src/mds/MDS.h b/src/mds/MDS.h index ea1aeb5262c4d..afdd46d365ba7 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -162,7 +162,7 @@ class MDS : public Dispatcher { int state; // my confirmed state int want_state; // the state i want - list waiting_for_active; + list waiting_for_active, waiting_for_replay; map > waiting_for_active_peer; list waiting_for_nolaggy; @@ -177,6 +177,9 @@ class MDS : public Dispatcher { void wait_for_active_peer(int who, Context *c) { waiting_for_active_peer[who].push_back(c); } + void wait_for_replay(Context *c) { + waiting_for_replay.push_back(c); + } int get_state() { return state; } bool is_creating() { return state == MDSMap::STATE_CREATING; } @@ -186,6 +189,7 @@ class MDS : public Dispatcher { bool is_resolve() { return state == MDSMap::STATE_RESOLVE; } bool is_reconnect() { return state == MDSMap::STATE_RECONNECT; } bool is_rejoin() { return state == MDSMap::STATE_REJOIN; } + bool is_clientreplay() { return state == MDSMap::STATE_CLIENTREPLAY; } bool is_active() { return state == MDSMap::STATE_ACTIVE; } bool is_stopping() { return state == MDSMap::STATE_STOPPING; } @@ -305,6 +309,8 @@ class MDS : public Dispatcher { void rejoin_done(); void recovery_done(); void handle_mds_recovery(int who); + void clientreplay_start(); + void clientreplay_done(); void stopping_start(); void stopping_done(); diff --git a/src/mds/MDSMap.h b/src/mds/MDSMap.h index e8d8c7bf6a3d0..30a0f7d30fa7f 100644 --- a/src/mds/MDSMap.h +++ b/src/mds/MDSMap.h @@ -72,6 +72,7 @@ public: static const int STATE_RESOLVE = CEPH_MDS_STATE_RESOLVE; // up, disambiguating distributed operations (import, rename, etc.) static const int STATE_RECONNECT = CEPH_MDS_STATE_RECONNECT; // up, reconnect to clients static const int STATE_REJOIN = CEPH_MDS_STATE_REJOIN; // up, replayed journal, rejoining distributed cache + static const int STATE_CLIENTREPLAY = CEPH_MDS_STATE_CLIENTREPLAY; // up, active static const int STATE_ACTIVE = CEPH_MDS_STATE_ACTIVE; // up, active static const int STATE_STOPPING = CEPH_MDS_STATE_STOPPING; // up, exporting metadata (-> standby or out) @@ -300,6 +301,7 @@ public: bool is_resolve(int m) { return get_state(m) == STATE_RESOLVE; } bool is_reconnect(int m) { return get_state(m) == STATE_RECONNECT; } bool is_rejoin(int m) { return get_state(m) == STATE_REJOIN; } + bool is_clientreplay(int m) { return get_state(m) == STATE_CLIENTREPLAY; } bool is_active(int m) { return get_state(m) == STATE_ACTIVE; } bool is_stopping(int m) { return get_state(m) == STATE_STOPPING; } bool is_active_or_stopping(int m) { return is_active(m) || is_stopping(m); } diff --git a/src/mds/Server.cc b/src/mds/Server.cc index ad1e314dcd789..908a397f34f0b 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -99,10 +99,23 @@ void Server::dispatch(Message *m) } // active? - if (!mds->is_active() && !mds->is_stopping()) { - dout(3) << "not active yet, waiting" << dendl; - mds->wait_for_active(new C_MDS_RetryMessage(mds, m)); - return; + if (!mds->is_active() && + !(mds->is_stopping() && m->get_orig_source().is_mds())) { + if (mds->is_reconnect() && + m->get_type() == CEPH_MSG_CLIENT_REQUEST && + ((MClientRequest*)m)->is_replay()) { + dout(3) << "queuing replayed op" << dendl; + mds->wait_for_replay(new C_MDS_RetryMessage(mds, m)); + return; + } else if (mds->is_clientreplay() && + m->get_type() == CEPH_MSG_CLIENT_REQUEST && + ((MClientRequest*)m)->is_replay()) { + // replaying! + } else { + dout(3) << "not active yet, waiting" << dendl; + mds->wait_for_active(new C_MDS_RetryMessage(mds, m)); + return; + } } switch (m->get_type()) { @@ -617,6 +630,11 @@ void Server::early_reply(MDRequest *mdr, CInode *tracei, CDentry *tracedn) if (client_inst.name.is_mds()) return; + if (req->is_replay()) { + dout(10) << "early_reply - none for replay request" << dendl; + return; + } + MClientReply *reply = new MClientReply(mdr->client_request, 0); reply->set_unsafe(); @@ -821,13 +839,6 @@ void Server::handle_client_request(MClientRequest *req) if (logger) logger->inc(l_mdss_hcreq); - if (!mds->is_active() && - !(mds->is_stopping() && req->get_orig_source().is_mds())) { - dout(5) << " not active (or stopping+mds), discarding request." << dendl; - delete req; - return; - } - if (!mdcache->is_open()) { dout(5) << "waiting for root" << dendl; mdcache->wait_for_open(new C_MDS_RetryMessage(mds, req)); diff --git a/src/messages/MClientRequest.h b/src/messages/MClientRequest.h index ff8bff5821366..cca6cc35a7535 100644 --- a/src/messages/MClientRequest.h +++ b/src/messages/MClientRequest.h @@ -186,6 +186,8 @@ public: out << " " << get_filepath2(); if (head.num_retry) out << " RETRY=" << (int)head.num_retry; + if (get_flags() & CEPH_MDS_FLAG_REPLAY) + out << " REPLAY"; out << ")"; } -- 2.39.5