]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: behave when we pipeline session updates to journal
authorSage Weil <sage@newdream.net>
Tue, 9 Feb 2010 18:03:12 +0000 (10:03 -0800)
committerSage Weil <sage@newdream.net>
Wed, 10 Feb 2010 18:31:16 +0000 (10:31 -0800)
src/mds/Server.cc
src/mds/Server.h
src/mds/SessionMap.h

index 7380d8dba043f9c83e067692b6594afe24e453d2..377d6a324016ceee2c0708a1b741d18240776367 100644 (file)
@@ -144,18 +144,19 @@ void Server::dispatch(Message *m)
 class C_MDS_session_finish : public Context {
   MDS *mds;
   Session *session;
+  __u64 state_seq;
   bool open;
   version_t cmapv;
   interval_set<inodeno_t> inos;
   version_t inotablev;
 public:
-  C_MDS_session_finish(MDS *m, Session *se, bool s, version_t mv) :
-    mds(m), session(se), open(s), cmapv(mv), inotablev(0) { }
-  C_MDS_session_finish(MDS *m, Session *se, bool s, version_t mv, interval_set<inodeno_t>& i, version_t iv) :
-    mds(m), session(se), open(s), cmapv(mv), inos(i), inotablev(iv) { }
+  C_MDS_session_finish(MDS *m, Session *se, __u64 sseq, bool s, version_t mv) :
+    mds(m), session(se), state_seq(sseq), open(s), cmapv(mv), inotablev(0) { }
+  C_MDS_session_finish(MDS *m, Session *se, __u64 sseq, bool s, version_t mv, interval_set<inodeno_t>& i, version_t iv) :
+    mds(m), session(se), state_seq(sseq), open(s), cmapv(mv), inos(i), inotablev(iv) { }
   void finish(int r) {
     assert(r == 0);
-    mds->server->_session_logged(session, open, cmapv, inos, inotablev);
+    mds->server->_session_logged(session, state_seq, open, cmapv, inos, inotablev);
   }
 };
 
@@ -185,6 +186,7 @@ void Server::handle_client_session(MClientSession *m)
     return;
   }
 
+  __u64 sseq = 0;
   switch (m->get_op()) {
   case CEPH_SESSION_REQUEST_OPEN:
     if (session->is_opening() || session->is_open()) {
@@ -193,11 +195,11 @@ void Server::handle_client_session(MClientSession *m)
     }
     if (session->is_closed())
       mds->sessionmap.add_session(session);
-    mds->sessionmap.set_state(session, Session::STATE_OPENING);
+    sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING);
     mds->sessionmap.touch_session(session);
     pv = ++mds->sessionmap.projected;
     mdlog->start_submit_entry(new ESession(m->get_source_inst(), true, pv),
-                             new C_MDS_session_finish(mds, session, true, pv));
+                             new C_MDS_session_finish(mds, session, sseq, true, pv));
     mdlog->flush();
     break;
 
@@ -231,7 +233,7 @@ void Server::handle_client_session(MClientSession *m)
                 << ", BUGGY!" << dendl;
        assert(0);
       }
-      mds->sessionmap.set_state(session, Session::STATE_CLOSING);
+      sseq = mds->sessionmap.set_state(session, Session::STATE_CLOSING);
       pv = ++mds->sessionmap.projected;
       
       interval_set<inodeno_t> both = session->prealloc_inos;
@@ -243,7 +245,7 @@ void Server::handle_client_session(MClientSession *m)
        piv = 0;
       
       mdlog->start_submit_entry(new ESession(m->get_source_inst(), false, pv, both, piv),
-                               new C_MDS_session_finish(mds, session, false, pv, both, piv));
+                               new C_MDS_session_finish(mds, session, sseq, false, pv, both, piv));
       mdlog->flush();
     }
     break;
@@ -253,18 +255,21 @@ void Server::handle_client_session(MClientSession *m)
   }
 }
 
-void Server::_session_logged(Session *session, bool open, version_t pv, interval_set<inodeno_t>& inos, version_t piv)
+void Server::_session_logged(Session *session, __u64 state_seq, bool open, version_t pv, interval_set<inodeno_t>& inos, version_t piv)
 {
-  dout(10) << "_session_logged " << session->inst << " " << (open ? "open":"close")
+  dout(10) << "_session_logged " << session->inst << " state_seq " << state_seq << " " << (open ? "open":"close")
           << " " << pv << dendl;
 
   // apply
-  if (open) {
+  if (session->get_state_seq() != state_seq) {
+    dout(10) << " journaled state_seq " << state_seq << " != current " << session->get_state_seq()
+            << ", noop" << dendl;
+    // close must have been canceled (by an import?), or any number of other things..
+  } else if (open) {
     assert(session->is_opening());
     mds->sessionmap.set_state(session, Session::STATE_OPEN);
     mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
   } else if (session->is_closing() || session->is_stale_closing()) {
-
     // kill any lingering capabilities, leases, requests
     while (!session->caps.empty()) {
       Capability *cap = session->caps.front();
@@ -295,8 +300,7 @@ void Server::_session_logged(Session *session, bool open, version_t pv, interval
     session->clear();
     mds->sessionmap.remove_session(session);
   } else {
-    // close must have been canceled (by an import?) ...
-    assert(!open);
+    assert(0);
   }
   mds->sessionmap.version++;  // noop
 }
@@ -349,11 +353,12 @@ void Server::terminate_sessions()
        p != sessions.end();
        ++p) {
     Session *session = *p;
-    if (session->is_closing()) continue;
-    mds->sessionmap.set_state(session, Session::STATE_CLOSING);
+    if (session->is_closing() || session->is_closed())
+      continue;
+    __u64 sseq = mds->sessionmap.set_state(session, Session::STATE_CLOSING);
     version_t pv = ++mds->sessionmap.projected;
     mdlog->start_submit_entry(new ESession(session->inst, false, pv),
-                             new C_MDS_session_finish(mds, session, false, pv));
+                             new C_MDS_session_finish(mds, session, sseq, false, pv));
     mdlog->flush();
   }
 }
@@ -419,9 +424,9 @@ void Server::find_idle_sessions()
   }
 }
 
-void Server::end_session (Session *session)
+void Server::end_session(Session *session)
 {
-  assert (session);
+  assert(session);
   mds->sessionmap.set_state(session, Session::STATE_STALE_PURGING);
   if (session->prealloc_inos.empty()) {
     _finish_session_purge(session);
@@ -441,10 +446,10 @@ void Server::_finish_session_purge(Session *session)
 {
   dout(10) << "_finish_session_purge " << session->inst << dendl;
   assert(session->is_stale_purging());
-  mds->sessionmap.set_state(session, Session::STATE_STALE_CLOSING);
+  __u64 sseq = mds->sessionmap.set_state(session, Session::STATE_STALE_CLOSING);
   version_t pv = ++mds->sessionmap.projected;
   mdlog->start_submit_entry(new ESession(session->inst, false, pv),
-                           new C_MDS_session_finish(mds, session, false, pv));
+                           new C_MDS_session_finish(mds, session, sseq, false, pv));
   mdlog->flush();
 }
 
@@ -507,8 +512,9 @@ void Server::handle_client_reconnect(MClientReconnect *m)
     dout(7) << " client had no session, removing from session map" << dendl;
     assert(session);  // ?
     version_t pv = ++mds->sessionmap.projected;
+    __u64 sseq = session->get_state_seq();
     mdlog->start_submit_entry(new ESession(session->inst, false, pv),
-                             new C_MDS_session_finish(mds, session, false, pv));
+                             new C_MDS_session_finish(mds, session, sseq, false, pv));
     mdlog->flush();
   } else {
     
index 4aa9378d3add1e0437346daf118edfc46882b54e..ad562e261bcc7a1f99e70767e6d40ec595f75167 100644 (file)
@@ -69,7 +69,8 @@ public:
 
   Session *get_session(Message *m);
   void handle_client_session(class MClientSession *m);
-  void _session_logged(Session *session, bool open, version_t pv, interval_set<inodeno_t>& inos,version_t piv);
+  void _session_logged(Session *session, __u64 state_seq, 
+                      bool open, version_t pv, interval_set<inodeno_t>& inos,version_t piv);
   void _finish_session_purge(Session *);
   version_t prepare_force_open_sessions(map<client_t,entity_inst_t> &cm);
   void finish_force_open_sessions(map<client_t,entity_inst_t> &cm);
index 14b8bc9d30058569b1d5638213d9ff34d8cece29..5c5e4b87fa88e201b8b05571f996695d374d89e7 100644 (file)
@@ -50,6 +50,7 @@ public:
 
 private:
   int state;
+  __u64 state_seq;
   friend class SessionMap;
 public:
   entity_inst_t inst;
@@ -89,6 +90,7 @@ public:
   client_t get_client() { return client_t(inst.name.num()); }
 
   int get_state() { return state; }
+  __u64 get_state_seq() { return state_seq; }
   bool is_new() { return state == STATE_NEW; }
   bool is_opening() { return state == STATE_OPENING; }
   bool is_open() { return state == STATE_OPEN; }
@@ -137,7 +139,7 @@ public:
 
 
   Session() : 
-    state(STATE_NEW), 
+    state(STATE_NEW), state_seq(0),
     session_list_item(this),
     cap_push_seq(0) { }
   ~Session() {
@@ -241,11 +243,13 @@ public:
     if (by_state[state].empty()) return 0;
     return by_state[state].front();
   }
-  void set_state(Session *session, int s) {
+  __u64 set_state(Session *session, int s) {
     if (session->state != s) {
       session->state = s;
+      session->state_seq++;
       by_state[s].push_back(&session->session_list_item);
     }
+    return session->state_seq;
   }
   void dump();