]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: send info of imported caps back to the exporter (cache rejoin)
authorYan, Zheng <zheng.z.yan@intel.com>
Tue, 26 Nov 2013 02:17:30 +0000 (10:17 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Mon, 16 Dec 2013 04:15:24 +0000 (12:15 +0800)
Use cache rejoin ack message to send information of rejoin imported
caps back to the exporter. Also move the code that exports reconnect
caps to MDCache::handle_cache_rejoin_ack()

This is preparation for including counterpart's information in cap
import/export message.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/mds/MDCache.cc
src/mds/MDCache.h
src/messages/MMDSCacheRejoin.h

index f2520c88c2940b33a49ad758ba3d37ac905d6ba9..d2942708fdbbc875cab2a8e603369ae6a7366468 100644 (file)
@@ -3876,6 +3876,8 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak)
     dout(10) << "i am a surivivor, and will ack immediately" << dendl;
     ack = new MMDSCacheRejoin(MMDSCacheRejoin::OP_ACK);
 
+    map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
+
     // check cap exports
     for (map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> >::iterator p = weak->cap_exports.begin();
         p != weak->cap_exports.end();
@@ -3886,10 +3888,16 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak)
           q != p->second.end();
           ++q) {
        dout(10) << " claiming cap import " << p->first << " client." << q->first << " on " << *in << dendl;
-       rejoin_import_cap(in, q->first, q->second, from);
+       Capability *cap = rejoin_import_cap(in, q->first, q->second, from);
+       Capability::Import& im = imported_caps[p->first][q->first];
+       im.cap_id = cap->get_cap_id();
+       im.issue_seq = cap->get_last_seq();
+       im.mseq = cap->get_mseq();
       }
       mds->locker->eval(in, CEPH_CAP_LOCKS, true);
     }
+
+    ::encode(imported_caps, ack->imported_caps);
   } else {
     assert(mds->is_rejoin());
 
@@ -4729,6 +4737,35 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
   // belongs to, were trimmed between sending cache rejoin and receiving rejoin ack.
   assert(isolated_inodes.empty());
 
+  map<inodeno_t,map<client_t,Capability::Import> > peer_imported;
+  bufferlist::iterator bp = ack->imported_caps.begin();
+  ::decode(peer_imported, bp);
+
+  for (map<inodeno_t,map<client_t,Capability::Import> >::iterator p = peer_imported.begin();
+       p != peer_imported.end();
+       ++p) {
+    assert(cap_exports.count(p->first));
+    assert(cap_export_targets.count(p->first));
+    assert(cap_export_targets[p->first] == from);
+    for (map<client_t,Capability::Import>::iterator q = p->second.begin();
+        q != p->second.end();
+        ++q) {
+      assert(cap_exports[p->first].count(q->first));
+
+      dout(10) << " exporting caps for client." << q->first << " ino " << p->first << dendl;
+      Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
+      assert(session);
+
+      // mark client caps stale.
+      MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0,
+                                      cap_exports[p->first][q->first].cap_id, 0);
+      mds->send_message_client_counted(m, session);
+
+      cap_exports[p->first].erase(q->first);
+    }
+    assert(cap_exports[p->first].empty());
+  }
+
   // done?
   assert(rejoin_ack_gather.count(from));
   rejoin_ack_gather.erase(from);
@@ -4996,33 +5033,28 @@ bool MDCache::process_imported_caps()
       }
       assert(in->is_auth());
       for (map<client_t,map<int,ceph_mds_cap_reconnect> >::iterator q = p->second.begin();
-         q != p->second.end();
-         ++q)
+          q != p->second.end();
+          ++q) {
+       Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
+       assert(session);
        for (map<int,ceph_mds_cap_reconnect>::iterator r = q->second.begin();
-           r != q->second.end();
-           ++r) {
-         dout(20) << " add_reconnected_cap " << in->ino() << " client." << q->first << dendl;
+            r != q->second.end();
+            ++r) {
          add_reconnected_cap(in, q->first, inodeno_t(r->second.snaprealm));
-         rejoin_import_cap(in, q->first, r->second, r->first);
+         Capability *cap = in->reconnect_cap(q->first, r->second, session);
+         if (r->first >= 0) {
+           do_cap_import(session, in, cap);
+
+           Capability::Import& im = rejoin_imported_caps[r->first][p->first][q->first];
+           im.cap_id = cap->get_cap_id();
+           im.issue_seq = cap->get_last_seq();
+           im.mseq = cap->get_mseq();
+         }
        }
+      }
       cap_imports.erase(p++);  // remove and move on
     }
   } else {
-    for (map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> >::iterator q = cap_exports.begin();
-        q != cap_exports.end();
-        ++q) {
-      for (map<client_t,ceph_mds_cap_reconnect>::iterator r = q->second.begin();
-          r != q->second.end();
-          ++r) {
-       dout(10) << " exporting caps for client." << r->first << " ino " << q->first << dendl;
-       Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(r->first.v));
-       assert(session);
-       // mark client caps stale.
-       MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, q->first, 0, 0, 0);
-       mds->send_message_client_counted(m, session);
-      }
-    }
-
     trim_non_auth();
 
     rejoin_gather.erase(mds->get_nodeid());
@@ -5174,7 +5206,7 @@ void MDCache::clean_open_file_lists()
 
 
 
-void MDCache::rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconnect& icr, int frommds)
+Capability* MDCache::rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconnect& icr, int frommds)
 {
   dout(10) << "rejoin_import_cap for client." << client << " from mds." << frommds
           << " on " << *in << dendl;
@@ -5187,6 +5219,8 @@ void MDCache::rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconn
     cap->rejoin_import();
     do_cap_import(session, in, cap);
   }
+
+  return cap;
 }
 
 void MDCache::export_remaining_imported_caps()
@@ -5594,9 +5628,12 @@ void MDCache::rejoin_send_acks()
   // send acks
   for (map<int,MMDSCacheRejoin*>::iterator p = ack.begin();
        p != ack.end();
-       ++p) 
+       ++p) {
+    ::encode(rejoin_imported_caps[p->first], p->second->imported_caps);
     mds->send_message_mds(p->second, p->first);
-  
+  }
+
+  rejoin_imported_caps.clear();
 }
 
 
index 3463efcf750eea586d678c53a4d7668e4796070c..fd66a32a03b4217167789ac7e99f9f100359d968 100644 (file)
@@ -411,6 +411,7 @@ protected:
   set<int> rejoin_gather;      // nodes from whom i need a rejoin
   set<int> rejoin_sent;        // nodes i sent a rejoin to
   set<int> rejoin_ack_gather;  // nodes from whom i need a rejoin ack
+  map<int,map<inodeno_t,map<client_t,Capability::Import> > > rejoin_imported_caps;
 
   map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> > cap_exports; // ino -> client -> capex
   map<inodeno_t,int> cap_export_targets; // ino -> auth mds
@@ -494,7 +495,7 @@ public:
                           map<client_t,MClientSnap*>& splits);
   void do_realm_invalidate_and_update_notify(CInode *in, int snapop, bool nosend=false);
   void send_snaps(map<client_t,MClientSnap*>& splits);
-  void rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconnect& icr, int frommds);
+  Capability* rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap_reconnect& icr, int frommds);
   void finish_snaprealm_reconnect(client_t client, SnapRealm *realm, snapid_t seq);
   void try_reconnect_cap(CInode *in, Session *session);
   void export_remaining_imported_caps();
index 4f079a91126e800e47c9b6569feb9eef16286c4d..d48cd67a4285a507073966a9c0d75ffefe9862d1 100644 (file)
@@ -168,6 +168,7 @@ class MMDSCacheRejoin : public Message {
 
   // open
   map<inodeno_t,map<client_t, ceph_mds_cap_reconnect> > cap_exports;
+  bufferlist imported_caps;
 
   // full
   bufferlist inode_base;
@@ -299,6 +300,7 @@ public:
     ::encode(xlocked_inodes, payload);
     ::encode(wrlocked_inodes, payload);
     ::encode(cap_exports, payload);
+    ::encode(imported_caps, payload);
     ::encode(strong_dirfrags, payload);
     ::encode(dirfrag_bases, payload);
     ::encode(weak, payload);
@@ -320,6 +322,7 @@ public:
     ::decode(xlocked_inodes, p);
     ::decode(wrlocked_inodes, p);
     ::decode(cap_exports, p);
+    ::decode(imported_caps, p);
     ::decode(strong_dirfrags, p);
     ::decode(dirfrag_bases, p);
     ::decode(weak, p);