From: Yan, Zheng Date: Thu, 20 Jul 2017 07:24:53 +0000 (+0800) Subject: mds: notify all mds about prepared snaptable update X-Git-Tag: v13.1.0~413^2~36 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=30301f7fa5a6d17bda1c150e45a6fb57d7be6469;p=ceph.git mds: notify all mds about prepared snaptable update After snaptable update get prepared, push the update preparation to all active snaptable clients, then send reply to update initiator. By this way, the initiator know that all mds have record the update preparation in their cache. When committing the snaptable update, the initiator notifies all mds about the commit. Bystander mds' snaptable cache get synchronized when it receives the notification. Signed-off-by: "Yan, Zheng" --- diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index e63061b2e822..2ef861880cd1 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -1800,8 +1800,11 @@ void MDSRankDispatcher::handle_mds_map( oldmap->get_stopped_mds_set(oldstopped); mdsmap->get_stopped_mds_set(stopped); for (set::iterator p = stopped.begin(); p != stopped.end(); ++p) - if (oldstopped.count(*p) == 0) // newly so? + if (oldstopped.count(*p) == 0) { // newly so? mdcache->migrator->handle_mds_failure_or_stop(*p); + if (mdsmap->get_tableserver() == whoami) + snapserver->handle_mds_failure_or_stop(*p); + } } { @@ -1866,6 +1869,9 @@ void MDSRank::handle_mds_failure(mds_rank_t who) mdcache->handle_mds_failure(who); + if (mdsmap->get_tableserver() == whoami) + snapserver->handle_mds_failure_or_stop(who); + snapclient->handle_mds_failure(who); } diff --git a/src/mds/MDSTableClient.cc b/src/mds/MDSTableClient.cc index 7d5c03a36d0d..61b901e2f7f5 100644 --- a/src/mds/MDSTableClient.cc +++ b/src/mds/MDSTableClient.cc @@ -60,6 +60,10 @@ void MDSTableClient::handle_request(class MMDSTableRequest *m) case TABLESERVER_OP_QUERY_REPLY: handle_query_result(m); break; + + case TABLESERVER_OP_NOTIFY_PREP: + handle_notify_prep(m); + break; case TABLESERVER_OP_AGREE: if (pending_prepare.count(reqid)) { diff --git a/src/mds/MDSTableClient.h b/src/mds/MDSTableClient.h index 0bd4f78018a1..ac4e66d072dd 100644 --- a/src/mds/MDSTableClient.h +++ b/src/mds/MDSTableClient.h @@ -85,6 +85,7 @@ public: // child must implement virtual void resend_queries() = 0; virtual void handle_query_result(MMDSTableRequest *m) = 0; + virtual void handle_notify_prep(MMDSTableRequest *m) = 0; virtual void notify_commit(version_t tid) = 0; // and friendly front-end for _prepare. diff --git a/src/mds/MDSTableServer.cc b/src/mds/MDSTableServer.cc index 88db122ab9c5..0cc505811fde 100644 --- a/src/mds/MDSTableServer.cc +++ b/src/mds/MDSTableServer.cc @@ -34,6 +34,7 @@ void MDSTableServer::handle_request(MMDSTableRequest *req) case TABLESERVER_OP_PREPARE: return handle_prepare(req); case TABLESERVER_OP_COMMIT: return handle_commit(req); case TABLESERVER_OP_ROLLBACK: return handle_rollback(req); + case TABLESERVER_OP_NOTIFY_ACK: return handle_notify_ack(req); default: assert(0 == "unrecognized mds_table_server request op"); } } @@ -83,10 +84,42 @@ void MDSTableServer::_prepare_logged(MMDSTableRequest *req, version_t tid) MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, req->reqid, tid); reply->bl = req->bl; - mds->send_message_mds(reply, from); + + if (_notify_prep(tid)) { + auto& p = pending_notifies[tid]; + p.notify_ack_gather = active_clients; + p.mds = from; + p.reply = reply; + } else { + mds->send_message_mds(reply, from); + } req->put(); } +void MDSTableServer::handle_notify_ack(MMDSTableRequest *m) +{ + dout(7) << __func__ << " " << *m << dendl; + mds_rank_t from = mds_rank_t(m->get_source().num()); + version_t tid = m->get_tid(); + + auto p = pending_notifies.find(tid); + if (p != pending_notifies.end()) { + if (p->second.notify_ack_gather.erase(from)) { + if (p->second.notify_ack_gather.empty()) { + if (p->second.onfinish) + p->second.onfinish->complete(0); + else + mds->send_message_mds(p->second.reply, p->second.mds); + pending_notifies.erase(p); + } + } else { + dout(0) << "got unexpected notify ack for tid " << tid << " from mds." << from << dendl; + } + } else { + } + m->put(); +} + class C_Commit : public MDSLogContextBase { MDSTableServer *server; MMDSTableRequest *req; @@ -234,29 +267,82 @@ void MDSTableServer::_server_update_logged(bufferlist& bl) _note_server_update(bl); } - // recovery +class C_ServerRecovery : public MDSInternalContextBase { + MDSTableServer *server; + MDSRank *get_mds() override { return server->mds; } +public: + C_ServerRecovery(MDSTableServer *s) : server(s) {} + void finish(int r) override { + server->_do_server_recovery(); + } +}; + +void MDSTableServer::_do_server_recovery() +{ + dout(7) << __func__ << " " << active_clients << dendl; + map next_reqids; + + for (auto p : pending_for_mds) { + mds_rank_t who = p.second.mds; + if (!active_clients.count(who)) + continue; + + if (p.second.reqid >= next_reqids[who]) + next_reqids[who] = p.second.reqid + 1; + + version_t tid = p.second.tid; + MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, p.second.reqid, tid); + _get_reply_buffer(tid, &reply->bl); + mds->send_message_mds(reply, who); + } + + for (auto p : active_clients) { + MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_SERVER_READY, next_reqids[p]); + mds->send_message_mds(reply, p); + } + recovered = true; +} + void MDSTableServer::finish_recovery(set& active) { - dout(7) << "finish_recovery" << dendl; - for (set::iterator p = active.begin(); p != active.end(); ++p) - handle_mds_recovery(*p); // resend agrees for everyone. + dout(7) << __func__ << dendl; + + active_clients = active; + + // don't know if survivor mds have received all 'notify prep' messages. + // so we need to send 'notify prep' again. + if (!pending_for_mds.empty() && _notify_prep(version)) { + auto& q = pending_notifies[version]; + q.notify_ack_gather = active_clients; + q.mds = MDS_RANK_NONE; + q.onfinish = new C_ServerRecovery(this); + } else { + _do_server_recovery(); + } } void MDSTableServer::handle_mds_recovery(mds_rank_t who) { dout(7) << "handle_mds_recovery mds." << who << dendl; + active_clients.insert(who); + if (!recovered) { + dout(7) << " still not recovered, delaying" << dendl; + return; + } + uint64_t next_reqid = 0; // resend agrees for recovered mds - for (map::iterator p = pending_for_mds.begin(); - p != pending_for_mds.end(); - ++p) { + for (auto p = pending_for_mds.begin(); p != pending_for_mds.end(); ++p) { if (p->second.mds != who) continue; + assert(!pending_notifies.count(p->second.tid)); + if (p->second.reqid >= next_reqid) next_reqid = p->second.reqid + 1; + MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, p->second.reqid, p->second.tid); _get_reply_buffer(p->second.tid, &reply->bl); mds->send_message_mds(reply, who); @@ -265,3 +351,35 @@ void MDSTableServer::handle_mds_recovery(mds_rank_t who) MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_SERVER_READY, next_reqid); mds->send_message_mds(reply, who); } + +void MDSTableServer::handle_mds_failure_or_stop(mds_rank_t who) +{ + dout(7) << __func__ << " mds." << who << dendl; + + active_clients.erase(who); + + list rollback; + for (auto p = pending_notifies.begin(); p != pending_notifies.end(); ) { + auto q = p++; + if (q->second.mds == who) { + // haven't sent reply yet. + rollback.push_back(q->second.reply); + pending_notifies.erase(q); + } else if (q->second.notify_ack_gather.erase(who)) { + // the failed mds will reload snaptable when it recovers. + // so we can remove it from the gather set. + if (q->second.notify_ack_gather.empty()) { + if (q->second.onfinish) + q->second.onfinish->complete(0); + else + mds->send_message_mds(q->second.reply, q->second.mds); + pending_notifies.erase(q); + } + } + } + + for (auto p : rollback) { + p->op = TABLESERVER_OP_ROLLBACK; + handle_rollback(p); + } +} diff --git a/src/mds/MDSTableServer.h b/src/mds/MDSTableServer.h index 55a380bdb5ea..a9edead457c6 100644 --- a/src/mds/MDSTableServer.h +++ b/src/mds/MDSTableServer.h @@ -22,10 +22,21 @@ class MMDSTableRequest; class MDSTableServer : public MDSTable { protected: int table; + bool recovered; + set active_clients; private: map pending_for_mds; // ** child should encode this! ** set committing_tids; + struct notify_info_t { + set notify_ack_gather; + mds_rank_t mds; + MMDSTableRequest *reply; + MDSInternalContextBase *onfinish; + notify_info_t() : reply(NULL), onfinish(NULL) {} + }; + map pending_notifies; + void handle_prepare(MMDSTableRequest *m); void _prepare_logged(MMDSTableRequest *m, version_t tid); friend class C_Prepare; @@ -41,6 +52,8 @@ private: void _server_update_logged(bufferlist& bl); friend class C_ServerUpdate; + void handle_notify_ack(MMDSTableRequest *m); + public: virtual void handle_query(MMDSTableRequest *m) = 0; virtual void _prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) = 0; @@ -48,6 +61,7 @@ public: virtual void _commit(version_t tid, MMDSTableRequest *req=NULL) = 0; virtual void _rollback(version_t tid) = 0; virtual void _server_update(bufferlist& bl) { ceph_abort(); } + virtual bool _notify_prep(version_t tid) { return false; }; void _note_prepare(mds_rank_t mds, uint64_t reqid, bool replay=false) { version++; @@ -75,7 +89,8 @@ public: projected_version = version; } - MDSTableServer(MDSRank *m, int tab) : MDSTable(m, get_mdstable_name(tab), false), table(tab) {} + MDSTableServer(MDSRank *m, int tab) : + MDSTable(m, get_mdstable_name(tab), false), table(tab), recovered(false) {} ~MDSTableServer() override {} void handle_request(MMDSTableRequest *m); @@ -95,7 +110,11 @@ public: // recovery void finish_recovery(set& active); + void _do_server_recovery(); + friend class C_ServerRecovery; + void handle_mds_recovery(mds_rank_t who); + void handle_mds_failure_or_stop(mds_rank_t who); }; #endif diff --git a/src/mds/Server.cc b/src/mds/Server.cc index c44bdc92aeec..06ddb0c7dcb8 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -8684,11 +8684,7 @@ void Server::handle_client_mksnap(MDRequestRef& mdr) decode(snapid, p); dout(10) << " stid " << stid << " snapid " << snapid << dendl; - // FIXME: notify all other mds the change - if (stid > mds->snapclient->get_cached_version()) { - mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr)); - return; - } + assert(mds->snapclient->get_cached_version() >= stid); // journal SnapInfo info; @@ -8827,11 +8823,7 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr) decode(seq, p); dout(10) << " stid is " << stid << ", seq is " << seq << dendl; - // FIXME: notify all other mds the change - if (stid > mds->snapclient->get_cached_version()) { - mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr)); - return; - } + assert(mds->snapclient->get_cached_version() >= stid); // journal auto &pi = diri->project_inode(false, true); @@ -8973,11 +8965,7 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr) version_t stid = mdr->more()->stid; dout(10) << " stid is " << stid << dendl; - // FIXME: notify all other mds the change - if (stid > mds->snapclient->get_cached_version()) { - mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr)); - return; - } + assert(mds->snapclient->get_cached_version() >= stid); // journal auto &pi = diri->project_inode(false, true); diff --git a/src/mds/SnapClient.cc b/src/mds/SnapClient.cc index 51bc29e886c5..e96000ebac26 100644 --- a/src/mds/SnapClient.cc +++ b/src/mds/SnapClient.cc @@ -85,7 +85,7 @@ void SnapClient::handle_query_result(MMDSTableRequest *m) } } - if (m->reqid >= sync_reqid) + if (m->op == TABLESERVER_OP_QUERY_REPLY && m->reqid >= sync_reqid) synced = true; if (synced && !waiting_for_version.empty()) { @@ -102,6 +102,14 @@ void SnapClient::handle_query_result(MMDSTableRequest *m) } } +void SnapClient::handle_notify_prep(MMDSTableRequest *m) +{ + dout(10) << __func__ << " " << *m << dendl; + handle_query_result(m); + MMDSTableRequest *ack = new MMDSTableRequest(table, TABLESERVER_OP_NOTIFY_ACK, 0, m->get_tid()); + mds->send_message(ack, m->get_connection()); +} + void SnapClient::notify_commit(version_t tid) { dout(10) << __func__ << " tid " << tid << dendl; diff --git a/src/mds/SnapClient.h b/src/mds/SnapClient.h index 6182493ec401..e71d9a1b8497 100644 --- a/src/mds/SnapClient.h +++ b/src/mds/SnapClient.h @@ -45,6 +45,7 @@ public: void resend_queries() override; void handle_query_result(MMDSTableRequest *m) override; + void handle_notify_prep(MMDSTableRequest *m) override; void notify_commit(version_t tid) override; void prepare_create(inodeno_t dirino, std::string_view name, utime_t stamp, diff --git a/src/mds/SnapServer.cc b/src/mds/SnapServer.cc index 79e58d91b628..824f02559f8c 100644 --- a/src/mds/SnapServer.cc +++ b/src/mds/SnapServer.cc @@ -246,8 +246,29 @@ void SnapServer::_server_update(bufferlist& bl) } } +bool SnapServer::_notify_prep(version_t tid) +{ + using ceph::encode; + bufferlist bl; + char type = 'F'; + encode(type, bl); + encode(snaps, bl); + encode(pending_update, bl); + encode(pending_destroy, bl); + assert(version == tid); + + for (auto p : active_clients) { + MMDSTableRequest *m = new MMDSTableRequest(table, TABLESERVER_OP_NOTIFY_PREP, 0, version); + m->bl = bl; + mds->send_message_mds(m, p); + } + return true; +} + void SnapServer::handle_query(MMDSTableRequest *req) { + using ceph::encode; + using ceph::decode; char op; bufferlist::iterator p = req->bl.begin(); decode(op, p); diff --git a/src/mds/SnapServer.h b/src/mds/SnapServer.h index 1ad34bd5ca56..517f00928978 100644 --- a/src/mds/SnapServer.h +++ b/src/mds/SnapServer.h @@ -34,12 +34,6 @@ protected: version_t last_checked_osdmap; -public: - SnapServer(MDSRank *m, MonClient *monc) - : MDSTableServer(m, TABLE_SNAP), mon_client(monc), last_checked_osdmap(0) - {} - - void reset_state() override; void encode_server_state(bufferlist& bl) const override { ENCODE_START(3, 3, bl); encode(last_snap, bl); @@ -68,26 +62,33 @@ public: DECODE_FINISH(bl); } - // To permit enc/decoding in isolation in dencoder - SnapServer() : MDSTableServer(NULL, TABLE_SNAP), last_checked_osdmap(0) {} - void encode(bufferlist& bl) const { - encode_server_state(bl); - } - void decode(bufferlist::iterator& bl) { - decode_server_state(bl); - } - void dump(Formatter *f) const; - static void generate_test_instances(list& ls); - // server bits void _prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) override; void _get_reply_buffer(version_t tid, bufferlist *pbl) const override; void _commit(version_t tid, MMDSTableRequest *req=NULL) override; void _rollback(version_t tid) override; void _server_update(bufferlist& bl) override; + bool _notify_prep(version_t tid) override; void handle_query(MMDSTableRequest *m) override; +public: + SnapServer(MDSRank *m, MonClient *monc) + : MDSTableServer(m, TABLE_SNAP), mon_client(monc), last_checked_osdmap(0) {} + SnapServer() : MDSTableServer(NULL, TABLE_SNAP), last_checked_osdmap(0) {} + + void reset_state() override; + void check_osd_map(bool force); + + void encode(bufferlist& bl) const { + encode_server_state(bl); + } + void decode(bufferlist::iterator& bl) { + decode_server_state(bl); + } + + void dump(Formatter *f) const; + static void generate_test_instances(list& ls); }; WRITE_CLASS_ENCODER(SnapServer) diff --git a/src/mds/mds_table_types.h b/src/mds/mds_table_types.h index 4003068bfd30..bd7ad47d59c2 100644 --- a/src/mds/mds_table_types.h +++ b/src/mds/mds_table_types.h @@ -40,6 +40,8 @@ enum { TABLESERVER_OP_ROLLBACK = 7, TABLESERVER_OP_SERVER_UPDATE = 8, TABLESERVER_OP_SERVER_READY = -9, + TABLESERVER_OP_NOTIFY_ACK = 10, + TABLESERVER_OP_NOTIFY_PREP = -11, }; inline const char *get_mdstableserver_opname(int op) { @@ -53,6 +55,8 @@ inline const char *get_mdstableserver_opname(int op) { case TABLESERVER_OP_ROLLBACK: return "rollback"; case TABLESERVER_OP_SERVER_UPDATE: return "server_update"; case TABLESERVER_OP_SERVER_READY: return "server_ready"; + case TABLESERVER_OP_NOTIFY_ACK: return "notify_ack"; + case TABLESERVER_OP_NOTIFY_PREP: return "notify_prep"; default: ceph_abort(); return 0; } }