From 7cea76f9307e23ab1fdaa1df7ddb3e3bafeb3edb Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 15 Jul 2017 17:35:33 +0800 Subject: [PATCH] mds: make change to mds table after corresponding event get logged Signed-off-by: "Yan, Zheng" --- src/mds/MDSTableServer.cc | 110 ++++++++++++++++++++++++++++++-------- src/mds/MDSTableServer.h | 37 +++++++++---- src/mds/SnapServer.cc | 14 +---- src/mds/SnapServer.h | 2 +- src/mds/journal.cc | 7 +-- 5 files changed, 121 insertions(+), 49 deletions(-) diff --git a/src/mds/MDSTableServer.cc b/src/mds/MDSTableServer.cc index 4552a469637ac..ee9d29ffaef12 100644 --- a/src/mds/MDSTableServer.cc +++ b/src/mds/MDSTableServer.cc @@ -57,29 +57,33 @@ void MDSTableServer::handle_prepare(MMDSTableRequest *req) { dout(7) << "handle_prepare " << *req << dendl; mds_rank_t from = mds_rank_t(req->get_source().num()); - bufferlist bl = req->bl; - - _prepare(req->bl, req->reqid, from); - _note_prepare(from, req->reqid); assert(g_conf->mds_kill_mdstable_at != 1); - ETableServer *le = new ETableServer(table, TABLESERVER_OP_PREPARE, req->reqid, from, version, version); + projected_version++; + + ETableServer *le = new ETableServer(table, TABLESERVER_OP_PREPARE, req->reqid, from, + projected_version, projected_version); mds->mdlog->start_entry(le); - le->mutation = bl; // original request, NOT modified return value coming out of _prepare! - mds->mdlog->submit_entry(le, new C_Prepare(this, req, version)); + le->mutation = req->bl; + mds->mdlog->submit_entry(le, new C_Prepare(this, req, projected_version)); mds->mdlog->flush(); } void MDSTableServer::_prepare_logged(MMDSTableRequest *req, version_t tid) { dout(7) << "_create_logged " << *req << " tid " << tid << dendl; + mds_rank_t from = mds_rank_t(req->get_source().num()); assert(g_conf->mds_kill_mdstable_at != 2); + _note_prepare(from, req->reqid); + _prepare(req->bl, req->reqid, from); + assert(version == tid); + MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, req->reqid, tid); reply->bl = req->bl; - mds->send_message_mds(reply, mds_rank_t(req->get_source().num())); + mds->send_message_mds(reply, from); req->put(); } @@ -104,21 +108,27 @@ void MDSTableServer::handle_commit(MMDSTableRequest *req) if (pending_for_mds.count(tid)) { + if (committing_tids.count(tid)) { + dout(0) << "got commit for tid " << tid << ", already committing, waiting." << dendl; + req->put(); + return; + } + assert(g_conf->mds_kill_mdstable_at != 5); - if (!_commit(tid, req)) - return; + projected_version++; + committing_tids.insert(tid); - _note_commit(tid); mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_COMMIT, 0, MDS_RANK_NONE, - tid, version), + tid, projected_version), new C_Commit(this, req)); } else if (tid <= version) { - dout(0) << "got commit for tid " << tid << " <= " << version - << ", already committed, sending ack." - << dendl; - _commit_logged(req); + dout(0) << "got commit for tid " << tid << " <= " << version + << ", already committed, sending ack." << dendl; + MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_ACK, req->reqid, tid); + mds->send_message(reply, req->get_connection()); + req->put(); } else { // wtf. @@ -133,39 +143,95 @@ void MDSTableServer::_commit_logged(MMDSTableRequest *req) dout(7) << "_commit_logged, sending ACK" << dendl; assert(g_conf->mds_kill_mdstable_at != 6); + version_t tid = req->get_tid(); + + pending_for_mds.erase(tid); + committing_tids.erase(tid); + + _commit(tid, req); + _note_commit(tid); MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_ACK, req->reqid, req->get_tid()); mds->send_message_mds(reply, mds_rank_t(req->get_source().num())); req->put(); } +class C_Rollback : public MDSLogContextBase { + MDSTableServer *server; + MMDSTableRequest *req; + MDSRank *get_mds() override { return server->mds; } +public: + C_Rollback(MDSTableServer *s, MMDSTableRequest *r) : server(s), req(r) {} + void finish(int r) override { + server->_rollback_logged(req); + } +}; + // ROLLBACK -/* This function DOES put the passed message before returning */ void MDSTableServer::handle_rollback(MMDSTableRequest *req) { dout(7) << "handle_rollback " << *req << dendl; version_t tid = req->get_tid(); assert(pending_for_mds.count(tid)); + assert(!committing_tids.count(tid)); + + projected_version++; + committing_tids.insert(tid); + + mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_ROLLBACK, 0, MDS_RANK_NONE, + tid, projected_version), + new C_Rollback(this, req)); +} + +/* This function DOES put the passed message before returning */ +void MDSTableServer::_rollback_logged(MMDSTableRequest *req) +{ + dout(7) << "_rollback_logged " << *req << dendl; + + assert(g_conf->mds_kill_mdstable_at != 7); + version_t tid = req->get_tid(); + + pending_for_mds.erase(tid); + committing_tids.erase(tid); + _rollback(tid); _note_rollback(tid); - mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_ROLLBACK, 0, MDS_RANK_NONE, - tid, version)); + req->put(); } // SERVER UPDATE +class C_ServerUpdate : public MDSLogContextBase { + MDSTableServer *server; + bufferlist bl; + MDSRank *get_mds() override { return server->mds; } +public: + C_ServerUpdate(MDSTableServer *s, bufferlist &b) : server(s), bl(b) {} + void finish(int r) override { + server->_server_update_logged(bl); + } +}; void MDSTableServer::do_server_update(bufferlist& bl) { dout(10) << "do_server_update len " << bl.length() << dendl; - _server_update(bl); - ETableServer *le = new ETableServer(table, TABLESERVER_OP_SERVER_UPDATE, 0, MDS_RANK_NONE, 0, version); + + projected_version++; + + ETableServer *le = new ETableServer(table, TABLESERVER_OP_SERVER_UPDATE, 0, MDS_RANK_NONE, 0, projected_version); mds->mdlog->start_entry(le); le->mutation = bl; - mds->mdlog->submit_entry(le); + mds->mdlog->submit_entry(le, new C_ServerUpdate(this, bl)); +} + +void MDSTableServer::_server_update_logged(bufferlist& bl) +{ + dout(10) << "_server_update_logged len " << bl.length() << dendl; + _server_update(bl); + _note_server_update(bl); } diff --git a/src/mds/MDSTableServer.h b/src/mds/MDSTableServer.h index f7de717c87d8f..34bca0519e22e 100644 --- a/src/mds/MDSTableServer.h +++ b/src/mds/MDSTableServer.h @@ -20,12 +20,12 @@ class MMDSTableRequest; class MDSTableServer : public MDSTable { -public: +protected: int table; +private: map pending_for_mds; // ** child should encode this! ** + set committing_tids; - -private: void handle_prepare(MMDSTableRequest *m); void _prepare_logged(MMDSTableRequest *m, version_t tid); friend class C_Prepare; @@ -34,28 +34,45 @@ private: void _commit_logged(MMDSTableRequest *m); friend class C_Commit; - void handle_rollback(MMDSTableRequest *m); + void _rollback_logged(MMDSTableRequest *m); + friend class C_Rollback; + + void _server_update_logged(bufferlist& bl); + friend class C_ServerUpdate; - public: +public: virtual void handle_query(MMDSTableRequest *m) = 0; virtual void _prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) = 0; - virtual bool _commit(version_t tid, MMDSTableRequest *req=NULL) = 0; + virtual void _commit(version_t tid, MMDSTableRequest *req=NULL) = 0; virtual void _rollback(version_t tid) = 0; virtual void _server_update(bufferlist& bl) { ceph_abort(); } - void _note_prepare(mds_rank_t mds, uint64_t reqid) { + void _note_prepare(mds_rank_t mds, uint64_t reqid, bool replay=false) { + version++; + if (replay) + projected_version = version; pending_for_mds[version].mds = mds; pending_for_mds[version].reqid = reqid; pending_for_mds[version].tid = version; } - void _note_commit(uint64_t tid) { + void _note_commit(uint64_t tid, bool replay=false) { + version++; + if (replay) + projected_version = version; pending_for_mds.erase(tid); } - void _note_rollback(uint64_t tid) { + void _note_rollback(uint64_t tid, bool replay=false) { + version++; + if (replay) + projected_version = version; pending_for_mds.erase(tid); } - + void _note_server_update(bufferlist& bl, bool replay=false) { + version++; + if (replay) + projected_version = version; + } MDSTableServer(MDSRank *m, int tab) : MDSTable(m, get_mdstable_name(tab), false), table(tab) {} ~MDSTableServer() override {} diff --git a/src/mds/SnapServer.cc b/src/mds/SnapServer.cc index ea78bff1d0874..6c37bdc0b1c01 100644 --- a/src/mds/SnapServer.cc +++ b/src/mds/SnapServer.cc @@ -72,8 +72,6 @@ void SnapServer::_prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) switch (op) { case TABLE_OP_CREATE: { - version++; - SnapInfo info; ::decode(info.ino, p); if (!p.end()) { @@ -98,7 +96,6 @@ void SnapServer::_prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) snapid_t snapid; ::decode(ino, p); // not used, currently. ::decode(snapid, p); - version++; // bump last_snap... we use it as a version value on the snaprealm. ++last_snap; @@ -120,7 +117,6 @@ void SnapServer::_prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) ::decode(info.stamp, p); info.long_name = "update"; - version++; // bump last_snap... we use it as a version value on the snaprealm. ++last_snap; pending_update[version] = info; @@ -144,7 +140,7 @@ bool SnapServer::_is_prepared(version_t tid) const pending_destroy.count(tid); } -bool SnapServer::_commit(version_t tid, MMDSTableRequest *req) +void SnapServer::_commit(version_t tid, MMDSTableRequest *req) { if (pending_update.count(tid)) { SnapInfo &info = pending_update[tid]; @@ -180,10 +176,7 @@ bool SnapServer::_commit(version_t tid, MMDSTableRequest *req) else ceph_abort(); - // bump version. - version++; //dump(); - return true; } void SnapServer::_rollback(version_t tid) @@ -212,8 +205,6 @@ void SnapServer::_rollback(version_t tid) else ceph_abort(); - // bump version. - version++; //dump(); } @@ -234,8 +225,6 @@ void SnapServer::_server_update(bufferlist& bl) if (need_to_purge[p->first].empty()) need_to_purge.erase(p->first); } - - version++; } void SnapServer::handle_query(MMDSTableRequest *req) @@ -373,5 +362,4 @@ void SnapServer::generate_test_instances(list& ls) populated->pending_noop.insert(890); ls.push_back(populated); - } diff --git a/src/mds/SnapServer.h b/src/mds/SnapServer.h index a93c9234fc3ee..058a6e2bdb2ab 100644 --- a/src/mds/SnapServer.h +++ b/src/mds/SnapServer.h @@ -82,7 +82,7 @@ public: // server bits void _prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) override; bool _is_prepared(version_t tid) const; - bool _commit(version_t tid, MMDSTableRequest *req=NULL) override; + void _commit(version_t tid, MMDSTableRequest *req=NULL) override; void _rollback(version_t tid) override; void _server_update(bufferlist& bl) override; void handle_query(MMDSTableRequest *m) override; diff --git a/src/mds/journal.cc b/src/mds/journal.cc index 4e6ab183bb88d..f97e0eb8726a6 100644 --- a/src/mds/journal.cc +++ b/src/mds/journal.cc @@ -1937,19 +1937,20 @@ void ETableServer::replay(MDSRank *mds) switch (op) { case TABLESERVER_OP_PREPARE: + server->_note_prepare(bymds, reqid, true); server->_prepare(mutation, reqid, bymds); - server->_note_prepare(bymds, reqid); break; case TABLESERVER_OP_COMMIT: server->_commit(tid); - server->_note_commit(tid); + server->_note_commit(tid, true); break; case TABLESERVER_OP_ROLLBACK: server->_rollback(tid); - server->_note_rollback(tid); + server->_note_rollback(tid, true); break; case TABLESERVER_OP_SERVER_UPDATE: server->_server_update(mutation); + server->_note_server_update(mutation, true); break; default: mds->clog->error() << "invalid tableserver op in ETableServer"; -- 2.39.5