]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: fix up client session importing
authorSage Weil <sage@newdream.net>
Tue, 23 Mar 2010 21:54:13 +0000 (14:54 -0700)
committerSage Weil <sage@newdream.net>
Tue, 23 Mar 2010 21:54:13 +0000 (14:54 -0700)
Keep import counter for each session, for overlapping imports.

Prevent a session close or kill during an import.

src/TODO
src/mds/MDCache.h
src/mds/Migrator.cc
src/mds/Migrator.h
src/mds/Server.cc
src/mds/Server.h
src/mds/SessionMap.h

index 5db30e3163d640dd5e9fb412b7229f1c946b900e..96e03b507c018398cf0c366443c3ad160ab6fc3f 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -60,7 +60,6 @@ filestore
   - need an osdmap cache layer?
 
 bugs
-- mds prepare_force_open_sessions, then import aborts.. session is still OPENING but no client_session is sent...
 - rm -r failure (on kernel tree)
 - dbench 1, restart mds (may take a few times), dbench will error out.
 
@@ -369,4 +368,4 @@ radosgw
 
 
 -- for nicer kclient debug output (everything but messenger, but including msg in/out)
-echo 'module ceph +p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file fs/ceph/messenger.c -p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- --- /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p  > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- === /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p  > /sys/kernel/debug/dynamic_debug/control
\ No newline at end of file
+echo 'module ceph +p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file fs/ceph/messenger.c -p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- --- /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p  > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- === /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p  > /sys/kernel/debug/dynamic_debug/control
index b43468e9a8e4d350cf5ca310bcfaa2fe09952c50..755c7116c9204863e11631be26a277955b85af4a 100644 (file)
@@ -305,6 +305,7 @@ struct MDRequest : public Mutation {
     bool was_link_merge;
 
     map<client_t,entity_inst_t> imported_client_map;
+    map<client_t,__u64> sseq_map;
     map<CInode*, map<client_t,Capability::Export> > cap_imports;
     
     // for snaps
index f59a7fb64a09a9a54a16f2b5bcfa6fc6c45c0db6..f92de726c8ee60003a266df9948e4fd41e0c5db3 100644 (file)
@@ -1648,12 +1648,13 @@ class C_MDS_ImportDirLoggedStart : public Context {
   int from;
 public:
   map<client_t,entity_inst_t> imported_client_map;
+  map<client_t,__u64> sseqmap;
 
   C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, int f) :
     migrator(m), dir(d), from(f) {
   }
   void finish(int r) {
-    migrator->import_logged_start(dir, from, imported_client_map);
+    migrator->import_logged_start(dir, from, imported_client_map, sseqmap);
   }
 };
 
@@ -1686,7 +1687,7 @@ void Migrator::handle_export_dir(MExportDir *m)
   bufferlist::iterator cmp = m->client_map.begin();
   ::decode(onlogged->imported_client_map, cmp);
   assert(cmp.end());
-  le->cmapv = mds->server->prepare_force_open_sessions(onlogged->imported_client_map);
+  le->cmapv = mds->server->prepare_force_open_sessions(onlogged->imported_client_map, onlogged->sseqmap);
   le->client_map.claim(m->client_map);
 
   bufferlist::iterator blp = m->export_data.begin();
@@ -1910,7 +1911,8 @@ void Migrator::import_reverse_final(CDir *dir)
 
 
 void Migrator::import_logged_start(CDir *dir, int from,
-                                  map<client_t,entity_inst_t>& imported_client_map)
+                                  map<client_t,entity_inst_t>& imported_client_map,
+                                  map<client_t,__u64>& sseqmap)
 {
   dout(7) << "import_logged " << *dir << dendl;
 
@@ -1920,7 +1922,7 @@ void Migrator::import_logged_start(CDir *dir, int from,
   assert (g_conf.mds_kill_import_at != 7);
 
   // force open client sessions and finish cap import
-  mds->server->finish_force_open_sessions(imported_client_map);
+  mds->server->finish_force_open_sessions(imported_client_map, sseqmap);
   
   for (map<CInode*, map<client_t,Capability::Export> >::iterator p = import_caps[dir].begin();
        p != import_caps[dir].end();
@@ -2307,10 +2309,12 @@ class C_M_LoggedImportCaps : public Context {
   int from;
 public:
   map<CInode*, map<client_t,Capability::Export> > cap_imports;
+  map<client_t,entity_inst_t> client_map;
+  map<client_t,__u64> sseqmap;
 
   C_M_LoggedImportCaps(Migrator *m, CInode *i, int f) : migrator(m), in(i), from(f) {}
   void finish(int r) {
-    migrator->logged_import_caps(in, from, cap_imports);
+    migrator->logged_import_caps(in, from, cap_imports, client_map, sseqmap);
   }  
 };
 
@@ -2326,6 +2330,8 @@ void Migrator::handle_export_caps(MExportCaps *ex)
    */
 
   C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(this, in, ex->get_source().num());
+  finish->client_map.swap(ex->client_map);
+
   ESessions *le = new ESessions(++mds->sessionmap.projected);
   mds->mdlog->start_entry(le);
 
@@ -2335,8 +2341,8 @@ void Migrator::handle_export_caps(MExportCaps *ex)
   assert(!finish->cap_imports.empty());   // thus, inode is pinned.
   
   // journal open client sessions
-  mds->server->prepare_force_open_sessions(ex->client_map);
-  le->client_map.swap(ex->client_map);
+  mds->server->prepare_force_open_sessions(finish->client_map, finish->sseqmap);
+  le->client_map = finish->client_map;
   
   mds->mdlog->submit_entry(le);
   mds->mdlog->wait_for_safe(finish);
@@ -2348,9 +2354,15 @@ void Migrator::handle_export_caps(MExportCaps *ex)
 
 void Migrator::logged_import_caps(CInode *in, 
                                  int from,
-                                 map<CInode*, map<client_t,Capability::Export> >& cap_imports) 
+                                 map<CInode*, map<client_t,Capability::Export> >& cap_imports,
+                                 map<client_t,entity_inst_t>& client_map,
+                                 map<client_t,__u64>& sseqmap) 
 {
   dout(10) << "logged_import_caps on " << *in << dendl;
+
+  // force open client sessions and finish cap import
+  mds->server->finish_force_open_sessions(client_map, sseqmap);
+
   assert(cap_imports.count(in));
   finish_import_inode_caps(in, from, cap_imports[in]);  
 
index 3debf202aa9e4ab042b32fefbb1a26392c4651fc..5d6df53bcc2a470bcc45a4d0148fba880a191251 100644 (file)
@@ -252,7 +252,8 @@ protected:
   void import_reverse_final(CDir *dir);
   void import_notify_abort(CDir *dir, set<CDir*>& bounds);
   void import_logged_start(CDir *dir, int from,
-                          map<client_t,entity_inst_t> &imported_client_map);
+                          map<client_t,entity_inst_t> &imported_client_map,
+                          map<client_t,__u64>& sseqmap);
   void handle_export_finish(MExportDirFinish *m);
 public:
   void import_finish(CDir *dir);
@@ -261,7 +262,9 @@ protected:
   void handle_export_caps(MExportCaps *m);
   void logged_import_caps(CInode *in, 
                          int from,
-                         map<CInode*, map<client_t,Capability::Export> >& cap_imports);
+                         map<CInode*, map<client_t,Capability::Export> >& cap_imports,
+                         map<client_t,entity_inst_t>& client_map,
+                         map<client_t,__u64>& sseqmap);
 
 
   friend class C_MDS_ImportDirLoggedStart;
index 2427e21010666427c73ce15cb4b672d030e47f7d..cf33c147da33e8353c69ac5e1cf891edd8d0dea8 100644 (file)
@@ -229,6 +229,10 @@ void Server::handle_client_session(MClientSession *m)
        dout(10) << "already closed|closing|killing, dropping this req" << dendl;
        return;
       }
+      if (session->is_importing()) {
+       dout(10) << "ignoring close req on importing session" << dendl;
+       return;
+      }
       assert(session->is_open() || 
             session->is_stale() || 
             session->is_opening());
@@ -323,7 +327,8 @@ void Server::_session_logged(Session *session, __u64 state_seq, bool open, versi
   mds->sessionmap.version++;  // noop
 }
 
-version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm)
+version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm,
+                                             map<client_t,__u64>& sseqmap)
 {
   version_t pv = ++mds->sessionmap.projected;
   dout(10) << "prepare_force_open_sessions " << pv 
@@ -334,13 +339,19 @@ version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm)
     if (session->is_closed() || 
        session->is_closing() ||
        session->is_killing())
-      mds->sessionmap.set_state(session, Session::STATE_OPENING);
+      sseqmap[p->first] = mds->sessionmap.set_state(session, Session::STATE_OPENING);
+    else
+      assert(session->is_open() ||
+            session->is_opening() ||
+            session->is_stale());
+    session->inc_importing();
     mds->sessionmap.touch_session(session);
   }
   return pv;
 }
 
-void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm)
+void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm,
+                                       map<client_t,__u64>& sseqmap)
 {
   /*
    * FIXME: need to carefully consider the race conditions between a
@@ -352,11 +363,21 @@ void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm)
   for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
     Session *session = mds->sessionmap.get_session(p->second.name);
     assert(session);
-    if (session->is_opening()) {
-      dout(10) << "force_open_sessions opening " << session->inst << dendl;
-      mds->sessionmap.set_state(session, Session::STATE_OPEN);
-      mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
+    
+    if (sseqmap.count(p->first)) {
+      __u64 sseq = sseqmap[p->first];
+      if (session->get_state_seq() != sseq) {
+       dout(10) << "force_open_sessions skipping changed " << session->inst << dendl;
+      } else {
+       dout(10) << "force_open_sessions opened " << session->inst << dendl;
+       mds->sessionmap.set_state(session, Session::STATE_OPEN);
+       mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
+      }
+    } else {
+      dout(10) << "force_open_sessions skipping already-open " << session->inst << dendl;
+      assert(session->is_open() || session->is_stale());
     }
+    session->dec_importing();
   }
   mds->sessionmap.version++;
 }
@@ -428,13 +449,17 @@ void Server::find_idle_sessions()
     Session *session = mds->sessionmap.get_oldest_session(Session::STATE_STALE);
     if (!session)
       break;
+    if (session->is_importing()) {
+      dout(10) << "stopping at importing session " << session->inst << dendl;
+      break;
+    }
     assert(session->is_stale());
     if (session->last_cap_renew >= cutoff) {
       dout(20) << "oldest stale session is " << session->inst << " and sufficiently new (" 
               << session->last_cap_renew << ")" << dendl;
       break;
     }
-
+    
     stringstream ss;
     utime_t age = now;
     age -= session->last_cap_renew;
@@ -448,9 +473,10 @@ void Server::find_idle_sessions()
 
 void Server::kill_session(Session *session)
 {
-  if (session->is_opening() ||
-      session->is_open() ||
-      session->is_stale()) {
+  if ((session->is_opening() ||
+       session->is_open() ||
+       session->is_stale()) &&
+      !session->is_importing()) {
     dout(10) << "kill_session " << session << dendl;
     __u64 sseq = mds->sessionmap.set_state(session, Session::STATE_KILLING);
     version_t pv = ++mds->sessionmap.projected;
@@ -458,10 +484,11 @@ void Server::kill_session(Session *session)
                              new C_MDS_session_finish(mds, session, sseq, false, pv));
     mdlog->flush();
   } else {
-    dout(10) << "kill_session already closing/killing " << session << dendl;
+    dout(10) << "kill_session importing or already closing/killing " << session << dendl;
     assert(session->is_closing() || 
           session->is_closed() || 
-          session->is_killing());
+          session->is_killing() ||
+          session->is_importing());
   }
 }
 
@@ -4343,7 +4370,7 @@ version_t Server::_rename_prepare_import(MDRequest *mdr, CDentry *srcdn, bufferl
   // imported caps
   ::decode(mdr->more()->imported_client_map, blp);
   ::encode(mdr->more()->imported_client_map, *client_map_bl);
-  prepare_force_open_sessions(mdr->more()->imported_client_map);
+  prepare_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map);
 
   list<ScatterLock*> updated_scatterlocks;  // we clear_updated explicitly below
   mdcache->migrator->decode_import_inode(srcdn, blp, 
@@ -4640,7 +4667,7 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen
       assert(mdr->more()->inode_import.length() > 0);
       
       // finish cap imports
-      finish_force_open_sessions(mdr->more()->imported_client_map);
+      finish_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map);
       if (mdr->more()->cap_imports.count(destdnl->get_inode()))
        mds->mdcache->migrator->finish_import_inode_caps(destdnl->get_inode(), srcdn->authority().first, 
                                                         mdr->more()->cap_imports[destdnl->get_inode()]);
index d830c6dffe7510b8a60dfa66c2a26911495750ae..bf2774824104a56ed84f5ceefb7e40058bbc0c99 100644 (file)
@@ -71,8 +71,10 @@ public:
   void handle_client_session(class MClientSession *m);
   void _session_logged(Session *session, __u64 state_seq, 
                       bool open, version_t pv, interval_set<inodeno_t>& inos,version_t piv);
-  version_t prepare_force_open_sessions(map<client_t,entity_inst_t> &cm);
-  void finish_force_open_sessions(map<client_t,entity_inst_t> &cm);
+  version_t prepare_force_open_sessions(map<client_t,entity_inst_t> &cm,
+                                       map<client_t,__u64>& sseqmap);
+  void finish_force_open_sessions(map<client_t,entity_inst_t> &cm,
+                                       map<client_t,__u64>& sseqmap);
   void terminate_sessions();
   void find_idle_sessions();
   void kill_session(Session *session);
index e866f1853038af9718ccaf6e837212292688c5f6..515050316344fa434cb4a4d04b7077fa5ab5d103 100644 (file)
@@ -44,16 +44,15 @@ class Session : public RefCountedObject {
 public:
   /*
                     
-        <deleted> <-- closed <------------+--------+
-             ^         |                  |        |
-             |         v                  |        |
-          killing <-- opening  <---+      |        |
-             ^         |   |       |      |        |
-             |         v   |       |      |        |
-           stale <--> open --> closing ---+        |
-             |          ^| |    |                  |
-             |          |v v    v                  |
-             +--------> importing <----------------+
+        <deleted> <-- closed <------------+
+             ^         |                  |
+             |         v                  |
+          killing <-- opening <----+      |
+             ^         |           |      |
+             |         v           |      |
+           stale <--> open --> closing ---+
+
+    + additional dimension of 'importing' (with counter)
 
   */
   static const int STATE_CLOSED = 0;
@@ -62,7 +61,6 @@ public:
   static const int STATE_CLOSING = 3;   // journaling close
   static const int STATE_STALE = 4;
   static const int STATE_KILLING = 5;
-  static const int STATE_IMPORTING = 6;
 
   const char *get_state_name(int s) {
     switch (s) {
@@ -72,7 +70,6 @@ public:
     case STATE_CLOSING: return "closing";
     case STATE_STALE: return "stale";
     case STATE_KILLING: return "killing";
-    case STATE_IMPORTING: return "importing";
     default: return "???";
     }
   }
@@ -80,6 +77,7 @@ public:
 private:
   int state;
   __u64 state_seq;
+  int importing_count;
   friend class SessionMap;
 public:
   entity_inst_t inst;
@@ -128,6 +126,15 @@ public:
   bool is_stale() { return state == STATE_STALE; }
   bool is_killing() { return state == STATE_KILLING; }
 
+  void inc_importing() {
+    ++importing_count;
+  }
+  void dec_importing() {
+    assert(importing_count);
+    --importing_count;
+  }
+  bool is_importing() { return importing_count > 0; }
+
   // -- caps --
 private:
   version_t cap_push_seq;        // cap push seq #
@@ -167,7 +174,7 @@ public:
 
 
   Session() : 
-    state(STATE_CLOSED), state_seq(0),
+    state(STATE_CLOSED), state_seq(0), importing_count(0),
     item_session_list(this),
     requests(0),  // member_offset passed to front() manually
     cap_push_seq(0) { }