]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: make change to mds table after corresponding event get logged
authorYan, Zheng <zyan@redhat.com>
Sat, 15 Jul 2017 09:35:33 +0000 (17:35 +0800)
committerYan, Zheng <zyan@redhat.com>
Mon, 6 Nov 2017 02:19:18 +0000 (10:19 +0800)
Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
src/mds/MDSTableServer.cc
src/mds/MDSTableServer.h
src/mds/SnapServer.cc
src/mds/SnapServer.h
src/mds/journal.cc

index 4552a469637ac8d0a8b412b16fa82fec6051fc43..ee9d29ffaef1279856fb8ba424cc9532cd0e9132 100644 (file)
@@ -57,29 +57,33 @@ void MDSTableServer::handle_prepare(MMDSTableRequest *req)
 {
   dout(7) << "handle_prepare " << *req << dendl;
   mds_rank_t from = mds_rank_t(req->get_source().num());
-  bufferlist bl = req->bl;
-
-  _prepare(req->bl, req->reqid, from);
-  _note_prepare(from, req->reqid);
 
   assert(g_conf->mds_kill_mdstable_at != 1);
 
-  ETableServer *le = new ETableServer(table, TABLESERVER_OP_PREPARE, req->reqid, from, version, version);
+  projected_version++;
+
+  ETableServer *le = new ETableServer(table, TABLESERVER_OP_PREPARE, req->reqid, from,
+                                     projected_version, projected_version);
   mds->mdlog->start_entry(le);
-  le->mutation = bl;  // original request, NOT modified return value coming out of _prepare!
-  mds->mdlog->submit_entry(le, new C_Prepare(this, req, version));
+  le->mutation = req->bl;
+  mds->mdlog->submit_entry(le, new C_Prepare(this, req, projected_version));
   mds->mdlog->flush();
 }
 
 void MDSTableServer::_prepare_logged(MMDSTableRequest *req, version_t tid)
 {
   dout(7) << "_create_logged " << *req << " tid " << tid << dendl;
+  mds_rank_t from = mds_rank_t(req->get_source().num());
 
   assert(g_conf->mds_kill_mdstable_at != 2);
 
+  _note_prepare(from, req->reqid);
+  _prepare(req->bl, req->reqid, from);
+  assert(version == tid);
+
   MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, req->reqid, tid);
   reply->bl = req->bl;
-  mds->send_message_mds(reply, mds_rank_t(req->get_source().num()));
+  mds->send_message_mds(reply, from);
   req->put();
 }
 
@@ -104,21 +108,27 @@ void MDSTableServer::handle_commit(MMDSTableRequest *req)
 
   if (pending_for_mds.count(tid)) {
 
+    if (committing_tids.count(tid)) {
+      dout(0) << "got commit for tid " << tid << ", already committing, waiting." << dendl;
+      req->put();
+      return;
+    }
+
     assert(g_conf->mds_kill_mdstable_at != 5);
 
-    if (!_commit(tid, req))
-      return;
+    projected_version++;
+    committing_tids.insert(tid);
 
-    _note_commit(tid);
     mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_COMMIT, 0, MDS_RANK_NONE, 
-                                                   tid, version),
+                                                   tid, projected_version),
                                   new C_Commit(this, req));
   }
   else if (tid <= version) {
-    dout(0) << "got commit for tid " << tid << " <= " << version 
-           << ", already committed, sending ack." 
-           << dendl;
-    _commit_logged(req);
+    dout(0) << "got commit for tid " << tid << " <= " << version
+           << ", already committed, sending ack." << dendl;
+    MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_ACK, req->reqid, tid);
+    mds->send_message(reply, req->get_connection());
+    req->put();
   } 
   else {
     // wtf.
@@ -133,39 +143,95 @@ void MDSTableServer::_commit_logged(MMDSTableRequest *req)
   dout(7) << "_commit_logged, sending ACK" << dendl;
 
   assert(g_conf->mds_kill_mdstable_at != 6);
+  version_t tid = req->get_tid();
+
+  pending_for_mds.erase(tid);
+  committing_tids.erase(tid);
+
+  _commit(tid, req);
+  _note_commit(tid);
 
   MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_ACK, req->reqid, req->get_tid());
   mds->send_message_mds(reply, mds_rank_t(req->get_source().num()));
   req->put();
 }
 
+class C_Rollback : public MDSLogContextBase {
+  MDSTableServer *server;
+  MMDSTableRequest *req;
+  MDSRank *get_mds() override { return server->mds; }
+public:
+  C_Rollback(MDSTableServer *s, MMDSTableRequest *r) : server(s), req(r) {}
+  void finish(int r) override {
+    server->_rollback_logged(req);
+  }
+};
+
 // ROLLBACK
-/* This function DOES put the passed message before returning */
 void MDSTableServer::handle_rollback(MMDSTableRequest *req)
 {
   dout(7) << "handle_rollback " << *req << dendl;
 
   version_t tid = req->get_tid();
   assert(pending_for_mds.count(tid));
+  assert(!committing_tids.count(tid));
+
+  projected_version++;
+  committing_tids.insert(tid);
+
+  mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_ROLLBACK, 0, MDS_RANK_NONE,
+                                                 tid, projected_version),
+                                new C_Rollback(this, req));
+}
+
+/* This function DOES put the passed message before returning */
+void MDSTableServer::_rollback_logged(MMDSTableRequest *req)
+{
+  dout(7) << "_rollback_logged " << *req << dendl;
+
+  assert(g_conf->mds_kill_mdstable_at != 7);
+  version_t tid = req->get_tid();
+
+  pending_for_mds.erase(tid);
+  committing_tids.erase(tid);
+
   _rollback(tid);
   _note_rollback(tid);
-  mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_ROLLBACK, 0, MDS_RANK_NONE, 
-                                                 tid, version));
+
   req->put();
 }
 
 
 
 // SERVER UPDATE
+class C_ServerUpdate : public MDSLogContextBase {
+  MDSTableServer *server;
+  bufferlist bl;
+  MDSRank *get_mds() override { return server->mds; }
+public:
+  C_ServerUpdate(MDSTableServer *s, bufferlist &b)  : server(s), bl(b) {}
+  void finish(int r) override {
+    server->_server_update_logged(bl);
+  }
+};
 
 void MDSTableServer::do_server_update(bufferlist& bl)
 {
   dout(10) << "do_server_update len " << bl.length() << dendl;
-  _server_update(bl);
-  ETableServer *le = new ETableServer(table, TABLESERVER_OP_SERVER_UPDATE, 0, MDS_RANK_NONE, 0, version);
+
+  projected_version++;
+
+  ETableServer *le = new ETableServer(table, TABLESERVER_OP_SERVER_UPDATE, 0, MDS_RANK_NONE, 0, projected_version);
   mds->mdlog->start_entry(le);
   le->mutation = bl;
-  mds->mdlog->submit_entry(le);
+  mds->mdlog->submit_entry(le, new C_ServerUpdate(this, bl));
+}
+
+void MDSTableServer::_server_update_logged(bufferlist& bl)
+{
+  dout(10) << "_server_update_logged len " << bl.length() << dendl;
+  _server_update(bl);
+  _note_server_update(bl);
 }
 
 
index f7de717c87d8f29469aa93f9ae8ccc2e4390201b..34bca0519e22e9d20d1184570e52c36a557b948a 100644 (file)
 class MMDSTableRequest;
 
 class MDSTableServer : public MDSTable {
-public:
+protected:
   int table;
+private:
   map<version_t,mds_table_pending_t> pending_for_mds;  // ** child should encode this! **
+  set<version_t> committing_tids;
 
-
-private:
   void handle_prepare(MMDSTableRequest *m);
   void _prepare_logged(MMDSTableRequest *m, version_t tid);
   friend class C_Prepare;
@@ -34,28 +34,45 @@ private:
   void _commit_logged(MMDSTableRequest *m);
   friend class C_Commit;
 
-
   void handle_rollback(MMDSTableRequest *m);
+  void _rollback_logged(MMDSTableRequest *m);
+  friend class C_Rollback;
+
+  void _server_update_logged(bufferlist& bl);
+  friend class C_ServerUpdate;
 
- public:
+public:
   virtual void handle_query(MMDSTableRequest *m) = 0;
   virtual void _prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) = 0;
-  virtual bool _commit(version_t tid, MMDSTableRequest *req=NULL) = 0;
+  virtual void _commit(version_t tid, MMDSTableRequest *req=NULL) = 0;
   virtual void _rollback(version_t tid) = 0;
   virtual void _server_update(bufferlist& bl) { ceph_abort(); }
 
-  void _note_prepare(mds_rank_t mds, uint64_t reqid) {
+  void _note_prepare(mds_rank_t mds, uint64_t reqid, bool replay=false) {
+    version++;
+    if (replay)
+      projected_version = version;
     pending_for_mds[version].mds = mds;
     pending_for_mds[version].reqid = reqid;
     pending_for_mds[version].tid = version;
   }
-  void _note_commit(uint64_t tid) {
+  void _note_commit(uint64_t tid, bool replay=false) {
+    version++;
+    if (replay)
+      projected_version = version;
     pending_for_mds.erase(tid);
   }
-  void _note_rollback(uint64_t tid) {
+  void _note_rollback(uint64_t tid, bool replay=false) {
+    version++;
+    if (replay)
+      projected_version = version;
     pending_for_mds.erase(tid);
   }
-  
+  void _note_server_update(bufferlist& bl, bool replay=false) {
+    version++;
+    if (replay)
+      projected_version = version;
+  }
 
   MDSTableServer(MDSRank *m, int tab) : MDSTable(m, get_mdstable_name(tab), false), table(tab) {}
   ~MDSTableServer() override {}
index ea78bff1d087457b9fab90292b86f08c4e5cb809..6c37bdc0b1c01ed215df22ba2b9f566f44fce168 100644 (file)
@@ -72,8 +72,6 @@ void SnapServer::_prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds)
   switch (op) {
   case TABLE_OP_CREATE:
     {
-      version++;
-
       SnapInfo info;
       ::decode(info.ino, p);
       if (!p.end()) {
@@ -98,7 +96,6 @@ void SnapServer::_prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds)
       snapid_t snapid;
       ::decode(ino, p);    // not used, currently.
       ::decode(snapid, p);
-      version++;
 
       // bump last_snap... we use it as a version value on the snaprealm.
       ++last_snap;
@@ -120,7 +117,6 @@ void SnapServer::_prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds)
       ::decode(info.stamp, p);
       info.long_name = "update";
 
-      version++;
       // bump last_snap... we use it as a version value on the snaprealm.
       ++last_snap;
       pending_update[version] = info;
@@ -144,7 +140,7 @@ bool SnapServer::_is_prepared(version_t tid) const
     pending_destroy.count(tid);
 }
 
-bool SnapServer::_commit(version_t tid, MMDSTableRequest *req)
+void SnapServer::_commit(version_t tid, MMDSTableRequest *req)
 {
   if (pending_update.count(tid)) {
     SnapInfo &info = pending_update[tid];
@@ -180,10 +176,7 @@ bool SnapServer::_commit(version_t tid, MMDSTableRequest *req)
   else
     ceph_abort();
 
-  // bump version.
-  version++;
   //dump();
-  return true;
 }
 
 void SnapServer::_rollback(version_t tid) 
@@ -212,8 +205,6 @@ void SnapServer::_rollback(version_t tid)
   else
     ceph_abort();
 
-  // bump version.
-  version++;
   //dump();
 }
 
@@ -234,8 +225,6 @@ void SnapServer::_server_update(bufferlist& bl)
     if (need_to_purge[p->first].empty())
       need_to_purge.erase(p->first);
   }
-
-  version++;
 }
 
 void SnapServer::handle_query(MMDSTableRequest *req)
@@ -373,5 +362,4 @@ void SnapServer::generate_test_instances(list<SnapServer*>& ls)
   populated->pending_noop.insert(890);
 
   ls.push_back(populated);
-
 }
index a93c9234fc3ee15919acb67d5e33dede056e5a76..058a6e2bdb2ab31ba8f449df0bd1b38da6ad867d 100644 (file)
@@ -82,7 +82,7 @@ public:
   // server bits
   void _prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) override;
   bool _is_prepared(version_t tid) const;
-  bool _commit(version_t tid, MMDSTableRequest *req=NULL) override;
+  void _commit(version_t tid, MMDSTableRequest *req=NULL) override;
   void _rollback(version_t tid) override;
   void _server_update(bufferlist& bl) override;
   void handle_query(MMDSTableRequest *m) override;
index 4e6ab183bb88dc4533ec22ad3159dcc3381fcb93..f97e0eb8726a6bd7932aaa36cb5b505a3a9be993 100644 (file)
@@ -1937,19 +1937,20 @@ void ETableServer::replay(MDSRank *mds)
 
   switch (op) {
   case TABLESERVER_OP_PREPARE:
+    server->_note_prepare(bymds, reqid, true);
     server->_prepare(mutation, reqid, bymds);
-    server->_note_prepare(bymds, reqid);
     break;
   case TABLESERVER_OP_COMMIT:
     server->_commit(tid);
-    server->_note_commit(tid);
+    server->_note_commit(tid, true);
     break;
   case TABLESERVER_OP_ROLLBACK:
     server->_rollback(tid);
-    server->_note_rollback(tid);
+    server->_note_rollback(tid, true);
     break;
   case TABLESERVER_OP_SERVER_UPDATE:
     server->_server_update(mutation);
+    server->_note_server_update(mutation, true);
     break;
   default:
     mds->clog->error() << "invalid tableserver op in ETableServer";