]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: replay all old client requests before handling new requests
authorSage Weil <sage@newdream.net>
Tue, 2 Jun 2009 20:33:07 +0000 (13:33 -0700)
committerSage Weil <sage@newdream.net>
Tue, 2 Jun 2009 20:33:07 +0000 (13:33 -0700)
Adds a new CLIENTREPLAY state between REJOIN and ACTIVE.  It it's strictly
necessary for anyone to know the MDS is handling it's backlog first, but
it doesn't hurt.

src/include/ceph_fs.h
src/mds/MDCache.h
src/mds/MDS.cc
src/mds/MDS.h
src/mds/MDSMap.h
src/mds/Server.cc
src/messages/MClientRequest.h

index 46ec816d010a993149e6f07b29d3b2dba5c74cd3..bfe1c8843b264e8d3f001aa23b29f7d0fc65fc25 100644 (file)
@@ -327,22 +327,23 @@ struct ceph_client_ticket {
  *   > 0 -> in
  *  <= 0 -> out
  */
-#define CEPH_MDS_STATE_DNE         0  /* down, does not exist. */
-#define CEPH_MDS_STATE_STOPPED    -1  /* down, once existed, but no subtrees.
-                                        empty log. */
-#define CEPH_MDS_STATE_BOOT       -4  /* up, boot announcement. */
-#define CEPH_MDS_STATE_STANDBY    -5  /* up, idle.  waiting for assignment. */
-#define CEPH_MDS_STATE_CREATING   -6  /* up, creating MDS instance. */
-#define CEPH_MDS_STATE_STARTING   -7  /* up, starting previously stopped mds. */
+#define CEPH_MDS_STATE_DNE          0  /* down, does not exist. */
+#define CEPH_MDS_STATE_STOPPED     -1  /* down, once existed, but no subtrees.
+                                         empty log. */
+#define CEPH_MDS_STATE_BOOT        -4  /* up, boot announcement. */
+#define CEPH_MDS_STATE_STANDBY     -5  /* up, idle.  waiting for assignment. */
+#define CEPH_MDS_STATE_CREATING    -6  /* up, creating MDS instance. */
+#define CEPH_MDS_STATE_STARTING    -7  /* up, starting previously stopped mds. */
 #define CEPH_MDS_STATE_STANDBY_REPLAY -8 /* up, tailing active node's journal */
 
-#define CEPH_MDS_STATE_REPLAY      8  /* up, replaying journal. */
-#define CEPH_MDS_STATE_RESOLVE     9  /* up, disambiguating distributed
-                                        operations (import, rename, etc.) */
-#define CEPH_MDS_STATE_RECONNECT   10 /* up, reconnect to clients */
-#define CEPH_MDS_STATE_REJOIN      11 /* up, rejoining distributed cache */
-#define CEPH_MDS_STATE_ACTIVE      12 /* up, active */
-#define CEPH_MDS_STATE_STOPPING    13 /* up, but exporting metadata */
+#define CEPH_MDS_STATE_REPLAY       8  /* up, replaying journal. */
+#define CEPH_MDS_STATE_RESOLVE      9  /* up, disambiguating distributed
+                                         operations (import, rename, etc.) */
+#define CEPH_MDS_STATE_RECONNECT    10 /* up, reconnect to clients */
+#define CEPH_MDS_STATE_REJOIN       11 /* up, rejoining distributed cache */
+#define CEPH_MDS_STATE_CLIENTREPLAY 12 /* up, replaying client operations */
+#define CEPH_MDS_STATE_ACTIVE       13 /* up, active */
+#define CEPH_MDS_STATE_STOPPING     14 /* up, but exporting metadata */
 
 static inline const char *ceph_mds_state_name(int s)
 {
@@ -361,6 +362,7 @@ static inline const char *ceph_mds_state_name(int s)
        case CEPH_MDS_STATE_RESOLVE:    return "up:resolve";
        case CEPH_MDS_STATE_RECONNECT:  return "up:reconnect";
        case CEPH_MDS_STATE_REJOIN:     return "up:rejoin";
+       case CEPH_MDS_STATE_CLIENTREPLAY: return "up:clientreplay";
        case CEPH_MDS_STATE_ACTIVE:     return "up:active";
        case CEPH_MDS_STATE_STOPPING:   return "up:stopping";
        default: return "";
index e7ade93d918a8fbca826ae0ba6f3cd3e4c3075af..32ef9bc644dcf33d178e90ffdc64d6d9c6e7c2e0 100644 (file)
@@ -529,6 +529,8 @@ protected:
   hash_map<metareqid_t, MDRequest*> active_requests; 
 
 public:
+  int get_num_active_requests() { return active_requests.size(); }
+
   MDRequest* request_start(MClientRequest *req);
   MDRequest* request_start_slave(metareqid_t rid, int by);
   MDRequest* request_start_internal(int op);
index 30fb42b0b083a67ac95350cad7323e17ae6147eb..5f282ca51c15b9527e2d44a796a308d50aef96e9 100644 (file)
@@ -630,12 +630,13 @@ void MDS::handle_mds_map(MMDSMap *m)
            << ceph_mds_state_name(state) << dendl;
     want_state = state;
 
-    // now active?
+    // did i just recover?
+    if ((is_active() || is_clientreplay()) &&
+       (oldstate == MDSMap::STATE_REJOIN ||
+        oldstate == MDSMap::STATE_RECONNECT)) 
+      recovery_done();
+
     if (is_active()) {
-      // did i just recover?
-      if (oldstate == MDSMap::STATE_REJOIN ||
-         oldstate == MDSMap::STATE_RECONNECT) 
-       recovery_done();
       finish_contexts(waiting_for_active);  // kick waiters
     } else if (is_replay() || is_standby_replay()) {
       replay_start();
@@ -643,6 +644,8 @@ void MDS::handle_mds_map(MMDSMap *m)
       resolve_start();
     } else if (is_reconnect()) {
       reconnect_start();
+    } else if (is_clientreplay()) {
+      clientreplay_start();
     } else if (is_creating()) {
       boot_create();
     } else if (is_starting()) {
@@ -966,19 +969,6 @@ void MDS::reconnect_done()
   request_state(MDSMap::STATE_REJOIN);    // move to rejoin state
 
   mdcache->reconnect_clean_open_file_lists();
-
-  /*
-  if (mdsmap->get_num_in_mds() == 1 &&
-      mdsmap->get_num_mds(MDSMap::STATE_FAILED) == 0) { // just me!
-
-    // finish processing caps (normally, this happens during rejoin, but we're skipping that...)
-    mdcache->rejoin_gather_finish();
-
-    request_state(MDSMap::STATE_ACTIVE);    // go active
-  } else {
-    request_state(MDSMap::STATE_REJOIN);    // move to rejoin state
-  }
-  */
 }
 
 void MDS::rejoin_joint_start()
@@ -991,6 +981,22 @@ void MDS::rejoin_done()
   dout(1) << "rejoin_done" << dendl;
   mdcache->show_subtrees();
   mdcache->show_cache();
+
+  if (waiting_for_replay.empty())
+    request_state(MDSMap::STATE_ACTIVE);
+  else
+    request_state(MDSMap::STATE_CLIENTREPLAY);
+}
+
+void MDS::clientreplay_start()
+{
+  dout(1) << "clientreplay_start" << dendl;
+  queue_waiters(waiting_for_replay); 
+}
+
+void MDS::clientreplay_done()
+{
+  dout(1) << "clientreplay_done" << dendl;
   request_state(MDSMap::STATE_ACTIVE);
 }
 
@@ -998,7 +1004,7 @@ void MDS::rejoin_done()
 void MDS::recovery_done()
 {
   dout(1) << "recovery_done -- successful recovery!" << dendl;
-  assert(is_active());
+  assert(is_active() || is_clientreplay());
   
   // kick anchortable (resent AGREEs)
   if (mdsmap->get_tableserver() == whoami) {
@@ -1017,8 +1023,6 @@ void MDS::recovery_done()
   bcast_mds_map();  
 
   mdcache->populate_mydir();
-
-  queue_waiters(waiting_for_active);
 }
 
 void MDS::handle_mds_recovery(int who) 
@@ -1258,16 +1262,26 @@ bool MDS::_dispatch(Message *m)
 
 
   // finish any triggered contexts
-  if (finished_queue.size()) {
+  static bool finishing = false;
+  if (!finishing && finished_queue.size()) {
     dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
     dout(10) << finished_queue << dendl;
     list<Context*> ls;
     ls.splice(ls.begin(), finished_queue);
     assert(finished_queue.empty());
+    finishing = true;
     finish_contexts(ls);
+    finishing = false;
+  } else {
+    // done with all client replayed requests?
+    if (!finishing &&
+       is_clientreplay() &&
+       mdcache->is_open() &&
+       mdcache->get_num_active_requests() == 0 &&
+       want_state == MDSMap::STATE_CLIENTREPLAY)
+      clientreplay_done();
   }
 
-
   // hack: thrash exports
   static utime_t start;
   utime_t now = g_clock.now();
index ea1aeb5262c4db76133a125149fa09dd84b4adf1..afdd46d365ba77b3a44babeb59fb68c9c6f49390 100644 (file)
@@ -162,7 +162,7 @@ class MDS : public Dispatcher {
   int state;         // my confirmed state
   int want_state;    // the state i want
 
-  list<Context*> waiting_for_active;
+  list<Context*> waiting_for_active, waiting_for_replay;
   map<int, list<Context*> > waiting_for_active_peer;
   list<Context*> waiting_for_nolaggy;
 
@@ -177,6 +177,9 @@ class MDS : public Dispatcher {
   void wait_for_active_peer(int who, Context *c) { 
     waiting_for_active_peer[who].push_back(c);
   }
+  void wait_for_replay(Context *c) { 
+    waiting_for_replay.push_back(c); 
+  }
 
   int get_state() { return state; } 
   bool is_creating() { return state == MDSMap::STATE_CREATING; }
@@ -186,6 +189,7 @@ class MDS : public Dispatcher {
   bool is_resolve()  { return state == MDSMap::STATE_RESOLVE; }
   bool is_reconnect() { return state == MDSMap::STATE_RECONNECT; }
   bool is_rejoin()   { return state == MDSMap::STATE_REJOIN; }
+  bool is_clientreplay()   { return state == MDSMap::STATE_CLIENTREPLAY; }
   bool is_active()   { return state == MDSMap::STATE_ACTIVE; }
   bool is_stopping() { return state == MDSMap::STATE_STOPPING; }
 
@@ -305,6 +309,8 @@ class MDS : public Dispatcher {
   void rejoin_done();
   void recovery_done();
   void handle_mds_recovery(int who);
+  void clientreplay_start();
+  void clientreplay_done();
 
   void stopping_start();
   void stopping_done();
index e8d8c7bf6a3d00a04f3483755da064a093116b69..30a0f7d30fa7fd53158877ac27ced445c99a5d7e 100644 (file)
@@ -72,6 +72,7 @@ public:
   static const int STATE_RESOLVE   =  CEPH_MDS_STATE_RESOLVE;  // up, disambiguating distributed operations (import, rename, etc.)
   static const int STATE_RECONNECT =  CEPH_MDS_STATE_RECONNECT;  // up, reconnect to clients
   static const int STATE_REJOIN    =  CEPH_MDS_STATE_REJOIN; // up, replayed journal, rejoining distributed cache
+  static const int STATE_CLIENTREPLAY = CEPH_MDS_STATE_CLIENTREPLAY; // up, active
   static const int STATE_ACTIVE =     CEPH_MDS_STATE_ACTIVE; // up, active
   static const int STATE_STOPPING  =  CEPH_MDS_STATE_STOPPING; // up, exporting metadata (-> standby or out)
   
@@ -300,6 +301,7 @@ public:
   bool is_resolve(int m)   { return get_state(m) == STATE_RESOLVE; }
   bool is_reconnect(int m) { return get_state(m) == STATE_RECONNECT; }
   bool is_rejoin(int m)    { return get_state(m) == STATE_REJOIN; }
+  bool is_clientreplay(int m)   { return get_state(m) == STATE_CLIENTREPLAY; }
   bool is_active(int m)   { return get_state(m) == STATE_ACTIVE; }
   bool is_stopping(int m) { return get_state(m) == STATE_STOPPING; }
   bool is_active_or_stopping(int m)   { return is_active(m) || is_stopping(m); }
index ad1e314dcd789bf2d823ebbb3ca625de1913713e..908a397f34f0b93ea710dfe5a02dd1c04b8a6392 100644 (file)
@@ -99,10 +99,23 @@ void Server::dispatch(Message *m)
   }
 
   // active?
-  if (!mds->is_active() && !mds->is_stopping()) {
-    dout(3) << "not active yet, waiting" << dendl;
-    mds->wait_for_active(new C_MDS_RetryMessage(mds, m));
-    return;
+  if (!mds->is_active() && 
+      !(mds->is_stopping() && m->get_orig_source().is_mds())) {
+    if (mds->is_reconnect() &&
+       m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
+       ((MClientRequest*)m)->is_replay()) {
+      dout(3) << "queuing replayed op" << dendl;
+      mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
+      return;
+    } else if (mds->is_clientreplay() &&
+              m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
+              ((MClientRequest*)m)->is_replay()) {
+      // replaying!
+    } else {
+      dout(3) << "not active yet, waiting" << dendl;
+      mds->wait_for_active(new C_MDS_RetryMessage(mds, m));
+      return;
+    }
   }
 
   switch (m->get_type()) {
@@ -617,6 +630,11 @@ void Server::early_reply(MDRequest *mdr, CInode *tracei, CDentry *tracedn)
   if (client_inst.name.is_mds())
     return;
 
+  if (req->is_replay()) {
+    dout(10) << "early_reply - none for replay request" << dendl;
+    return;
+  }
+
   MClientReply *reply = new MClientReply(mdr->client_request, 0);
   reply->set_unsafe();
 
@@ -821,13 +839,6 @@ void Server::handle_client_request(MClientRequest *req)
 
   if (logger) logger->inc(l_mdss_hcreq);
 
-  if (!mds->is_active() &&
-      !(mds->is_stopping() && req->get_orig_source().is_mds())) {
-    dout(5) << " not active (or stopping+mds), discarding request." << dendl;
-    delete req;
-    return;
-  }
-  
   if (!mdcache->is_open()) {
     dout(5) << "waiting for root" << dendl;
     mdcache->wait_for_open(new C_MDS_RetryMessage(mds, req));
index ff8bff5821366e771c075793ab3bef28afdd2be8..cca6cc35a75354e5ed5202608bd2e88b5fc74c29 100644 (file)
@@ -186,6 +186,8 @@ public:
       out << " " << get_filepath2();
     if (head.num_retry)
       out << " RETRY=" << (int)head.num_retry;
+    if (get_flags() & CEPH_MDS_FLAG_REPLAY)
+      out << " REPLAY";
     out << ")";
   }