]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: notify all mds about prepared snaptable update
authorYan, Zheng <zyan@redhat.com>
Thu, 20 Jul 2017 07:24:53 +0000 (15:24 +0800)
committerYan, Zheng <zyan@redhat.com>
Fri, 9 Feb 2018 09:46:55 +0000 (17:46 +0800)
After snaptable update get prepared, push the update preparation to
all active snaptable clients, then send reply to update initiator.
By this way, the initiator know that all mds have record the update
preparation in their cache. When committing the snaptable update,
the initiator notifies all mds about the commit. Bystander mds'
snaptable cache get synchronized when it receives the notification.

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
src/mds/MDSRank.cc
src/mds/MDSTableClient.cc
src/mds/MDSTableClient.h
src/mds/MDSTableServer.cc
src/mds/MDSTableServer.h
src/mds/Server.cc
src/mds/SnapClient.cc
src/mds/SnapClient.h
src/mds/SnapServer.cc
src/mds/SnapServer.h
src/mds/mds_table_types.h

index e63061b2e8228c49fbc56c84a88edbcb4fac71b5..2ef861880cd16a6309d19139e5c0031de1d4b319 100644 (file)
@@ -1800,8 +1800,11 @@ void MDSRankDispatcher::handle_mds_map(
     oldmap->get_stopped_mds_set(oldstopped);
     mdsmap->get_stopped_mds_set(stopped);
     for (set<mds_rank_t>::iterator p = stopped.begin(); p != stopped.end(); ++p)
-      if (oldstopped.count(*p) == 0)      // newly so?
+      if (oldstopped.count(*p) == 0) {     // newly so?
        mdcache->migrator->handle_mds_failure_or_stop(*p);
+       if (mdsmap->get_tableserver() == whoami)
+         snapserver->handle_mds_failure_or_stop(*p);
+      }
   }
 
   {
@@ -1866,6 +1869,9 @@ void MDSRank::handle_mds_failure(mds_rank_t who)
 
   mdcache->handle_mds_failure(who);
 
+  if (mdsmap->get_tableserver() == whoami)
+    snapserver->handle_mds_failure_or_stop(who);
+
   snapclient->handle_mds_failure(who);
 }
 
index 7d5c03a36d0d29d867abf3242e49622cf1533fe1..61b901e2f7f5e221d915f26c89285ee3978f4463 100644 (file)
@@ -60,6 +60,10 @@ void MDSTableClient::handle_request(class MMDSTableRequest *m)
   case TABLESERVER_OP_QUERY_REPLY:
     handle_query_result(m);
     break;
+
+  case TABLESERVER_OP_NOTIFY_PREP:
+    handle_notify_prep(m);
+    break;
     
   case TABLESERVER_OP_AGREE:
     if (pending_prepare.count(reqid)) {
index 0bd4f78018a1a3d28a4a5318aebe84bd19e32cf0..ac4e66d072dd6735388a5f8e884861012282c973 100644 (file)
@@ -85,6 +85,7 @@ public:
   // child must implement
   virtual void resend_queries() = 0;
   virtual void handle_query_result(MMDSTableRequest *m) = 0;
+  virtual void handle_notify_prep(MMDSTableRequest *m) = 0;
   virtual void notify_commit(version_t tid) = 0;
 
   // and friendly front-end for _prepare.
index 88db122ab9c527f1825b16ff8f051f9a9c4dd8d6..0cc505811fdecc2b7368eecd19d57258fc78a057 100644 (file)
@@ -34,6 +34,7 @@ void MDSTableServer::handle_request(MMDSTableRequest *req)
   case TABLESERVER_OP_PREPARE: return handle_prepare(req);
   case TABLESERVER_OP_COMMIT: return handle_commit(req);
   case TABLESERVER_OP_ROLLBACK: return handle_rollback(req);
+  case TABLESERVER_OP_NOTIFY_ACK: return handle_notify_ack(req);
   default: assert(0 == "unrecognized mds_table_server request op");
   }
 }
@@ -83,10 +84,42 @@ void MDSTableServer::_prepare_logged(MMDSTableRequest *req, version_t tid)
 
   MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, req->reqid, tid);
   reply->bl = req->bl;
-  mds->send_message_mds(reply, from);
+
+  if (_notify_prep(tid)) {
+    auto& p = pending_notifies[tid];
+    p.notify_ack_gather = active_clients;
+    p.mds = from;
+    p.reply = reply;
+  } else {
+    mds->send_message_mds(reply, from);
+  }
   req->put();
 }
 
+void MDSTableServer::handle_notify_ack(MMDSTableRequest *m)
+{
+  dout(7) << __func__ << " " << *m << dendl;
+  mds_rank_t from = mds_rank_t(m->get_source().num());
+  version_t tid = m->get_tid();
+
+  auto p = pending_notifies.find(tid);
+  if (p != pending_notifies.end()) {
+    if (p->second.notify_ack_gather.erase(from)) {
+      if (p->second.notify_ack_gather.empty()) {
+       if (p->second.onfinish)
+         p->second.onfinish->complete(0);
+       else
+         mds->send_message_mds(p->second.reply, p->second.mds);
+       pending_notifies.erase(p);
+      }
+    } else {
+      dout(0) << "got unexpected notify ack for tid " <<  tid << " from mds." << from << dendl;
+    }
+  } else {
+  }
+  m->put();
+}
+
 class C_Commit : public MDSLogContextBase {
   MDSTableServer *server;
   MMDSTableRequest *req;
@@ -234,29 +267,82 @@ void MDSTableServer::_server_update_logged(bufferlist& bl)
   _note_server_update(bl);
 }
 
-
 // recovery
 
+class C_ServerRecovery : public MDSInternalContextBase {
+  MDSTableServer *server;
+  MDSRank *get_mds() override { return server->mds; }
+public:
+  C_ServerRecovery(MDSTableServer *s)  : server(s) {}
+  void finish(int r) override {
+    server->_do_server_recovery();
+  }
+};
+
+void MDSTableServer::_do_server_recovery()
+{
+  dout(7) << __func__ << " " << active_clients <<  dendl;
+  map<mds_rank_t, uint64_t> next_reqids;
+
+  for (auto p : pending_for_mds) {
+    mds_rank_t who = p.second.mds;
+    if (!active_clients.count(who))
+      continue;
+
+    if (p.second.reqid >= next_reqids[who])
+      next_reqids[who] = p.second.reqid + 1;
+
+    version_t tid = p.second.tid;
+    MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, p.second.reqid, tid);
+    _get_reply_buffer(tid, &reply->bl);
+    mds->send_message_mds(reply, who);
+  }
+
+  for (auto p : active_clients) {
+    MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_SERVER_READY, next_reqids[p]);
+    mds->send_message_mds(reply, p);
+  }
+  recovered = true;
+}
+
 void MDSTableServer::finish_recovery(set<mds_rank_t>& active)
 {
-  dout(7) << "finish_recovery" << dendl;
-  for (set<mds_rank_t>::iterator p = active.begin(); p != active.end(); ++p)
-    handle_mds_recovery(*p);  // resend agrees for everyone.
+  dout(7) << __func__ << dendl;
+
+  active_clients = active;
+
+  // don't know if survivor mds have received all 'notify prep' messages.
+  // so we need to send 'notify prep' again.
+  if (!pending_for_mds.empty() && _notify_prep(version)) {
+    auto& q = pending_notifies[version];
+    q.notify_ack_gather = active_clients;
+    q.mds = MDS_RANK_NONE;
+    q.onfinish = new C_ServerRecovery(this);
+  } else {
+    _do_server_recovery();
+  }
 }
 
 void MDSTableServer::handle_mds_recovery(mds_rank_t who)
 {
   dout(7) << "handle_mds_recovery mds." << who << dendl;
 
+  active_clients.insert(who);
+  if (!recovered) {
+    dout(7) << " still not recovered, delaying" << dendl;
+    return;
+  }
+
   uint64_t next_reqid = 0;
   // resend agrees for recovered mds
-  for (map<version_t,mds_table_pending_t>::iterator p = pending_for_mds.begin();
-       p != pending_for_mds.end();
-       ++p) {
+  for (auto p = pending_for_mds.begin(); p != pending_for_mds.end(); ++p) {
     if (p->second.mds != who)
       continue;
+    assert(!pending_notifies.count(p->second.tid));
+
     if (p->second.reqid >= next_reqid)
       next_reqid = p->second.reqid + 1;
+
     MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, p->second.reqid, p->second.tid);
     _get_reply_buffer(p->second.tid, &reply->bl);
     mds->send_message_mds(reply, who);
@@ -265,3 +351,35 @@ void MDSTableServer::handle_mds_recovery(mds_rank_t who)
   MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_SERVER_READY, next_reqid);
   mds->send_message_mds(reply, who);
 }
+
+void MDSTableServer::handle_mds_failure_or_stop(mds_rank_t who)
+{
+  dout(7) << __func__ << " mds." << who << dendl;
+
+  active_clients.erase(who);
+
+  list<MMDSTableRequest*> rollback;
+  for (auto p = pending_notifies.begin(); p != pending_notifies.end(); ) {
+    auto q = p++;
+    if (q->second.mds == who) {
+      // haven't sent reply yet.
+      rollback.push_back(q->second.reply);
+      pending_notifies.erase(q);
+    } else if (q->second.notify_ack_gather.erase(who)) {
+      // the failed mds will reload snaptable when it recovers.
+      // so we can remove it from the gather set.
+      if (q->second.notify_ack_gather.empty()) {
+       if (q->second.onfinish)
+         q->second.onfinish->complete(0);
+       else
+         mds->send_message_mds(q->second.reply, q->second.mds);
+       pending_notifies.erase(q);
+      }
+    }
+  }
+
+  for (auto p : rollback) {
+    p->op = TABLESERVER_OP_ROLLBACK;
+    handle_rollback(p);
+  }
+}
index 55a380bdb5eaae17505024bb080deb7b1e7789d7..a9edead457c6d2fcdab49cbee07a7bb5f1a40217 100644 (file)
@@ -22,10 +22,21 @@ class MMDSTableRequest;
 class MDSTableServer : public MDSTable {
 protected:
   int table;
+  bool recovered;
+  set<mds_rank_t> active_clients;
 private:
   map<version_t,mds_table_pending_t> pending_for_mds;  // ** child should encode this! **
   set<version_t> committing_tids;
 
+  struct notify_info_t {
+    set<mds_rank_t> notify_ack_gather;
+    mds_rank_t mds;
+    MMDSTableRequest *reply;
+    MDSInternalContextBase *onfinish;
+    notify_info_t() : reply(NULL), onfinish(NULL) {}
+  };
+  map<version_t, notify_info_t> pending_notifies;
+
   void handle_prepare(MMDSTableRequest *m);
   void _prepare_logged(MMDSTableRequest *m, version_t tid);
   friend class C_Prepare;
@@ -41,6 +52,8 @@ private:
   void _server_update_logged(bufferlist& bl);
   friend class C_ServerUpdate;
 
+  void handle_notify_ack(MMDSTableRequest *m);
+
 public:
   virtual void handle_query(MMDSTableRequest *m) = 0;
   virtual void _prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) = 0;
@@ -48,6 +61,7 @@ public:
   virtual void _commit(version_t tid, MMDSTableRequest *req=NULL) = 0;
   virtual void _rollback(version_t tid) = 0;
   virtual void _server_update(bufferlist& bl) { ceph_abort(); }
+  virtual bool _notify_prep(version_t tid) { return false; };
 
   void _note_prepare(mds_rank_t mds, uint64_t reqid, bool replay=false) {
     version++;
@@ -75,7 +89,8 @@ public:
       projected_version = version;
   }
 
-  MDSTableServer(MDSRank *m, int tab) : MDSTable(m, get_mdstable_name(tab), false), table(tab) {}
+  MDSTableServer(MDSRank *m, int tab) :
+    MDSTable(m, get_mdstable_name(tab), false), table(tab), recovered(false) {}
   ~MDSTableServer() override {}
 
   void handle_request(MMDSTableRequest *m);
@@ -95,7 +110,11 @@ public:
 
   // recovery
   void finish_recovery(set<mds_rank_t>& active);
+  void _do_server_recovery();
+  friend class C_ServerRecovery;
+
   void handle_mds_recovery(mds_rank_t who);
+  void handle_mds_failure_or_stop(mds_rank_t who);
 };
 
 #endif
index c44bdc92aeec01c27653cfe694ccc920dc3b8595..06ddb0c7dcb8c0d9892f0edbe7de35c30e36203b 100644 (file)
@@ -8684,11 +8684,7 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
   decode(snapid, p);
   dout(10) << " stid " << stid << " snapid " << snapid << dendl;
 
-  // FIXME: notify all other mds the change
-  if (stid > mds->snapclient->get_cached_version()) {
-    mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr));
-    return;
-  }
+  assert(mds->snapclient->get_cached_version() >= stid);
 
   // journal
   SnapInfo info;
@@ -8827,11 +8823,7 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
   decode(seq, p);  
   dout(10) << " stid is " << stid << ", seq is " << seq << dendl;
 
-  // FIXME: notify all other mds the change
-  if (stid > mds->snapclient->get_cached_version()) {
-    mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr));
-    return;
-  }
+  assert(mds->snapclient->get_cached_version() >= stid);
 
   // journal
   auto &pi = diri->project_inode(false, true);
@@ -8973,11 +8965,7 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr)
   version_t stid = mdr->more()->stid;
   dout(10) << " stid is " << stid << dendl;
 
-  // FIXME: notify all other mds the change
-  if (stid > mds->snapclient->get_cached_version()) {
-    mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr));
-    return;
-  }
+  assert(mds->snapclient->get_cached_version() >= stid);
 
   // journal
   auto &pi = diri->project_inode(false, true);
index 51bc29e886c5e8a9b3408db42ab0119941f5d3b4..e96000ebac263c577102f851c8a1e4ae8a506245 100644 (file)
@@ -85,7 +85,7 @@ void SnapClient::handle_query_result(MMDSTableRequest *m)
     }
   }
 
-  if (m->reqid >= sync_reqid)
+  if (m->op == TABLESERVER_OP_QUERY_REPLY && m->reqid >= sync_reqid)
     synced = true;
 
   if (synced && !waiting_for_version.empty()) {
@@ -102,6 +102,14 @@ void SnapClient::handle_query_result(MMDSTableRequest *m)
   }
 }
 
+void SnapClient::handle_notify_prep(MMDSTableRequest *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+  handle_query_result(m);
+  MMDSTableRequest *ack = new MMDSTableRequest(table, TABLESERVER_OP_NOTIFY_ACK, 0, m->get_tid());
+  mds->send_message(ack, m->get_connection());
+}
+
 void SnapClient::notify_commit(version_t tid)
 {
   dout(10) << __func__ << " tid " << tid << dendl;
index 6182493ec4015363df8456f89044645332946a66..e71d9a1b849749a180cfc73dec9efb703b1b1142 100644 (file)
@@ -45,6 +45,7 @@ public:
 
   void resend_queries() override;
   void handle_query_result(MMDSTableRequest *m) override;
+  void handle_notify_prep(MMDSTableRequest *m) override;
   void notify_commit(version_t tid) override;
 
   void prepare_create(inodeno_t dirino, std::string_view name, utime_t stamp,
index 79e58d91b62830ce719c3723cad3a90fc67ae2bb..824f02559f8c74d6416dbff02953030be0df76af 100644 (file)
@@ -246,8 +246,29 @@ void SnapServer::_server_update(bufferlist& bl)
   }
 }
 
+bool SnapServer::_notify_prep(version_t tid)
+{
+  using ceph::encode;
+  bufferlist bl;
+  char type = 'F';
+  encode(type, bl);
+  encode(snaps, bl);
+  encode(pending_update, bl);
+  encode(pending_destroy, bl);
+  assert(version == tid);
+
+  for (auto p : active_clients) {
+    MMDSTableRequest *m = new MMDSTableRequest(table, TABLESERVER_OP_NOTIFY_PREP, 0, version);
+    m->bl = bl;
+    mds->send_message_mds(m, p);
+  }
+  return true;
+}
+
 void SnapServer::handle_query(MMDSTableRequest *req)
 {
+  using ceph::encode;
+  using ceph::decode;
   char op;
   bufferlist::iterator p = req->bl.begin();
   decode(op, p);
index 1ad34bd5ca564cd3f8b08a5e31b5865aa6280485..517f009289789c56dd711b5748d31871216fdbd0 100644 (file)
@@ -34,12 +34,6 @@ protected:
 
   version_t last_checked_osdmap;
 
-public:
-  SnapServer(MDSRank *m, MonClient *monc)
-    : MDSTableServer(m, TABLE_SNAP), mon_client(monc), last_checked_osdmap(0)
-  {}
-    
-  void reset_state() override;
   void encode_server_state(bufferlist& bl) const override {
     ENCODE_START(3, 3, bl);
     encode(last_snap, bl);
@@ -68,26 +62,33 @@ public:
     DECODE_FINISH(bl);
   }
 
-  // To permit enc/decoding in isolation in dencoder
-  SnapServer() : MDSTableServer(NULL, TABLE_SNAP), last_checked_osdmap(0) {}
-  void encode(bufferlist& bl) const {
-    encode_server_state(bl);
-  }
-  void decode(bufferlist::iterator& bl) {
-    decode_server_state(bl);
-  }
-  void dump(Formatter *f) const;
-  static void generate_test_instances(list<SnapServer*>& ls);
-
   // server bits
   void _prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) override;
   void _get_reply_buffer(version_t tid, bufferlist *pbl) const override;
   void _commit(version_t tid, MMDSTableRequest *req=NULL) override;
   void _rollback(version_t tid) override;
   void _server_update(bufferlist& bl) override;
+  bool _notify_prep(version_t tid) override;
   void handle_query(MMDSTableRequest *m) override;
 
+public:
+  SnapServer(MDSRank *m, MonClient *monc)
+    : MDSTableServer(m, TABLE_SNAP), mon_client(monc), last_checked_osdmap(0) {}
+  SnapServer() : MDSTableServer(NULL, TABLE_SNAP), last_checked_osdmap(0) {}
+
+  void reset_state() override;
+
   void check_osd_map(bool force);
+
+  void encode(bufferlist& bl) const {
+    encode_server_state(bl);
+  }
+  void decode(bufferlist::iterator& bl) {
+    decode_server_state(bl);
+  }
+
+  void dump(Formatter *f) const;
+  static void generate_test_instances(list<SnapServer*>& ls);
 };
 WRITE_CLASS_ENCODER(SnapServer)
 
index 4003068bfd3042ece70ce51c4acac8f6b6f08303..bd7ad47d59c2bb1795e3174adb22415c3ec0d9d1 100644 (file)
@@ -40,6 +40,8 @@ enum {
   TABLESERVER_OP_ROLLBACK     =  7,
   TABLESERVER_OP_SERVER_UPDATE = 8,
   TABLESERVER_OP_SERVER_READY = -9,
+  TABLESERVER_OP_NOTIFY_ACK   = 10,
+  TABLESERVER_OP_NOTIFY_PREP  = -11,
 };
 
 inline const char *get_mdstableserver_opname(int op) {
@@ -53,6 +55,8 @@ inline const char *get_mdstableserver_opname(int op) {
   case TABLESERVER_OP_ROLLBACK: return "rollback";
   case TABLESERVER_OP_SERVER_UPDATE: return "server_update";
   case TABLESERVER_OP_SERVER_READY: return "server_ready";
+  case TABLESERVER_OP_NOTIFY_ACK: return "notify_ack";
+  case TABLESERVER_OP_NOTIFY_PREP: return "notify_prep";
   default: ceph_abort(); return 0;
   }
 }