]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: open sessions for rejoin imported caps
authorYan, Zheng <zheng.z.yan@intel.com>
Thu, 23 Jan 2014 09:12:47 +0000 (17:12 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Mon, 17 Feb 2014 01:37:52 +0000 (09:37 +0800)
Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/mds/MDCache.cc
src/mds/MDCache.h
src/messages/MMDSCacheRejoin.h

index fa860e868917dd30c43e6866ebeee65d4282dbb0..c8641e4f69f2ea310f2da3f19ca5ce553d567d9e 100644 (file)
@@ -56,6 +56,7 @@
 #include "events/EImportFinish.h"
 #include "events/EFragment.h"
 #include "events/ECommitted.h"
+#include "events/ESessions.h"
 
 #include "messages/MGenericMessage.h"
 
@@ -3613,11 +3614,24 @@ void MDCache::rejoin_send_rejoins()
   }
 
   if (mds->is_rejoin()) {
+    map<client_t, set<int> > client_exports;
     for (map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> >::iterator p = cap_exports.begin();
          p != cap_exports.end();
         ++p) {
       assert(cap_export_targets.count(p->first));
-      rejoins[cap_export_targets[p->first]]->cap_exports[p->first] = p->second;
+      int target = cap_export_targets[p->first];
+      rejoins[target]->cap_exports[p->first] = p->second;
+      for (map<client_t,ceph_mds_cap_reconnect>::iterator q = p->second.begin();
+          q != p->second.end();
+          ++q)
+       client_exports[q->first].insert(target);
+    }
+    for (map<client_t, set<int> >::iterator p = client_exports.begin();
+        p != client_exports.end();
+        ++p) {
+      entity_inst_t inst = mds->sessionmap.get_inst(entity_name_t::CLIENT(p->first.v));
+      for (set<int>::iterator q = p->second.begin(); q != p->second.end(); ++q)
+       rejoins[*q]->client_map[p->first] = inst;
     }
   }
   
@@ -3970,6 +3984,8 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak)
     assert(gather_locks.empty());
 
     // check cap exports.
+    rejoin_client_map.insert(weak->client_map.begin(), weak->client_map.end());
+
     for (map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> >::iterator p = weak->cap_exports.begin();
         p != weak->cap_exports.end();
         ++p) {
@@ -5099,11 +5115,34 @@ void MDCache::rejoin_open_ino_finish(inodeno_t ino, int ret)
   cap_imports_num_opening--;
 
   if (cap_imports_num_opening == 0) {
-    if (rejoin_gather.count(mds->get_nodeid()))
-      process_imported_caps();
-    else
+    if (rejoin_gather.empty())
       rejoin_gather_finish();
+    else if (rejoin_gather.count(mds->get_nodeid()))
+      process_imported_caps();
+  }
+}
+
+class C_MDC_RejoinSessionsOpened : public Context {
+  MDCache *cache;
+public:
+  map<client_t,entity_inst_t> client_map;
+  map<client_t,uint64_t> sseqmap;
+
+  C_MDC_RejoinSessionsOpened(MDCache *c, map<client_t,entity_inst_t>& cm) :
+    cache(c), client_map(cm) {}
+  void finish(int r) {
+    assert(r == 0);
+    cache->rejoin_open_sessions_finish(client_map, sseqmap);
   }
+};
+
+void MDCache::rejoin_open_sessions_finish(map<client_t,entity_inst_t> client_map,
+                                         map<client_t,uint64_t>& sseqmap)
+{
+  dout(10) << "rejoin_open_sessions_finish" << dendl;
+  mds->server->finish_force_open_sessions(client_map, sseqmap);
+  if (rejoin_gather.empty())
+    rejoin_gather_finish();
 }
 
 bool MDCache::process_imported_caps()
@@ -5132,6 +5171,22 @@ bool MDCache::process_imported_caps()
 
   // called by rejoin_gather_finish() ?
   if (rejoin_gather.count(mds->get_nodeid()) == 0) {
+    // if sessions for imported caps are all open ?
+    for (map<client_t,entity_inst_t>::iterator p = rejoin_client_map.begin();
+        p != rejoin_client_map.end();
+        ++p) {
+      if (!mds->sessionmap.have_session(entity_name_t::CLIENT(p->first.v))) {
+       C_MDC_RejoinSessionsOpened *finish = new C_MDC_RejoinSessionsOpened(this, rejoin_client_map);
+       version_t pv = mds->server->prepare_force_open_sessions(rejoin_client_map, finish->sseqmap);
+       ESessions *le = new ESessions(pv, rejoin_client_map);
+       mds->mdlog->start_submit_entry(le, finish);
+       mds->mdlog->flush();
+       rejoin_client_map.clear();
+       return true;
+      }
+    }
+    rejoin_client_map.clear();
+
     // process caps that were exported by slave rename
     for (map<inodeno_t,pair<int,map<client_t,Capability::Export> > >::iterator p = rejoin_slave_exports.begin();
         p != rejoin_slave_exports.end();
index 750c6b18813b2f26f4311d69736fe83110193dd4..b3c5ad5535870283244299e16a3ab832e64d7597 100644 (file)
@@ -413,6 +413,7 @@ protected:
   set<int> rejoin_ack_gather;  // nodes from whom i need a rejoin ack
   map<int,map<inodeno_t,map<client_t,Capability::Import> > > rejoin_imported_caps;
   map<inodeno_t,pair<int,map<client_t,Capability::Export> > > rejoin_slave_exports;
+  map<client_t,entity_inst_t> rejoin_client_map;
 
   map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> > cap_exports; // ino -> client -> capex
   map<inodeno_t,int> cap_export_targets; // ino -> auth mds
@@ -490,7 +491,10 @@ public:
   }
 
   friend class C_MDC_RejoinOpenInoFinish;
+  friend class C_MDC_RejoinSessionsOpened;
   void rejoin_open_ino_finish(inodeno_t ino, int ret);
+  void rejoin_open_sessions_finish(map<client_t,entity_inst_t> client_map,
+                                  map<client_t,uint64_t>& sseqmap);
   bool process_imported_caps();
   void choose_lock_states_and_reconnect_caps();
   void prepare_realm_split(SnapRealm *realm, client_t client, inodeno_t ino,
index 9cbee3a7bf6c7b5abb1c2d9e57cd7727b0c6075e..bbafe64a35a7eacf7e79649bbe3b8c9f8404dce5 100644 (file)
@@ -168,6 +168,7 @@ class MMDSCacheRejoin : public Message {
 
   // open
   map<inodeno_t,map<client_t, ceph_mds_cap_reconnect> > cap_exports;
+  map<client_t, entity_inst_t> client_map;
   bufferlist imported_caps;
 
   // full
@@ -298,6 +299,7 @@ public:
     ::encode(xlocked_inodes, payload);
     ::encode(wrlocked_inodes, payload);
     ::encode(cap_exports, payload);
+    ::encode(client_map, payload);
     ::encode(imported_caps, payload);
     ::encode(strong_dirfrags, payload);
     ::encode(dirfrag_bases, payload);
@@ -320,6 +322,7 @@ public:
     ::decode(xlocked_inodes, p);
     ::decode(wrlocked_inodes, p);
     ::decode(cap_exports, p);
+    ::decode(client_map, p);
     ::decode(imported_caps, p);
     ::decode(strong_dirfrags, p);
     ::decode(dirfrag_bases, p);