]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: re-send cap exports in resolve message.
authorYan, Zheng <zheng.z.yan@intel.com>
Tue, 26 Nov 2013 07:10:29 +0000 (15:10 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Mon, 16 Dec 2013 04:15:25 +0000 (12:15 +0800)
For rename operation that changes inode's authority, if master mds
of the operation crashed, inode's original auth mds sends export
messages to clients when it receives the master mds' resolve ack
message, Client can't reply on the export message to add caps for
the master mds, then reconnect the cap when the master mds enters
reconnect stage. Because client may receive the export message after
receiving mdsmap that claims the master mds is in reconnect stage.

The fix is include cap exports in resolve message, so the master mds
can send import messages to clients when it enters the rejoin stage.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/mds/MDCache.cc
src/mds/MDCache.h
src/messages/MMDSResolve.h
src/messages/MMDSResolveAck.h

index 811f31b0b952651ab0f32a0d591e0973ee21e6a8..2747ca4c06973e9e7c4201ce3ca259f5d3d1909b 100644 (file)
@@ -2513,7 +2513,18 @@ void MDCache::send_slave_resolves()
        dout(10) << " including uncommitted " << *p->second << dendl;
        if (!resolves.count(master))
          resolves[master] = new MMDSResolve;
-       resolves[master]->add_slave_request(p->first);
+       if (p->second->has_more() && p->second->more()->is_inode_exporter) {
+         // re-send cap exports
+         CInode *in = p->second->more()->rename_inode;
+         map<client_t, Capability::Export> cap_map;
+         in->export_client_caps(cap_map);
+         bufferlist bl;
+         ::encode(in->ino(), bl);
+         ::encode(cap_map, bl);
+         resolves[master]->add_slave_request(p->first, bl);
+       } else {
+         resolves[master]->add_slave_request(p->first);
+       }
       }
     }
   }
@@ -2854,11 +2865,11 @@ void MDCache::handle_resolve(MMDSResolve *m)
 
   // ambiguous slave requests?
   if (!m->slave_requests.empty()) {
-    for (vector<metareqid_t>::iterator p = m->slave_requests.begin();
+    for (map<metareqid_t, bufferlist>::iterator p = m->slave_requests.begin();
         p != m->slave_requests.end();
         ++p) {
-      if (uncommitted_masters.count(*p) && !uncommitted_masters[*p].safe)
-       pending_masters.insert(*p);
+      if (uncommitted_masters.count(p->first) && !uncommitted_masters[p->first].safe)
+       pending_masters.insert(p->first);
     }
 
     if (!pending_masters.empty()) {
@@ -2868,18 +2879,47 @@ void MDCache::handle_resolve(MMDSResolve *m)
     }
 
     MMDSResolveAck *ack = new MMDSResolveAck;
-    for (vector<metareqid_t>::iterator p = m->slave_requests.begin();
+    for (map<metareqid_t, bufferlist>::iterator p = m->slave_requests.begin();
         p != m->slave_requests.end();
         ++p) {
-      if (uncommitted_masters.count(*p)) {  //mds->sessionmap.have_completed_request(*p)) {
+      if (uncommitted_masters.count(p->first)) {  //mds->sessionmap.have_completed_request(p->first)) {
        // COMMIT
        dout(10) << " ambiguous slave request " << *p << " will COMMIT" << dendl;
-       ack->add_commit(*p);
-       uncommitted_masters[*p].slaves.insert(from);   // wait for slave OP_COMMITTED before we log ECommitted
+       ack->add_commit(p->first);
+       uncommitted_masters[p->first].slaves.insert(from);   // wait for slave OP_COMMITTED before we log ECommitted
+
+       if (p->second.length() > 0) {
+         // slave wants to export caps (rename)
+         assert(mds->is_resolve());
+
+         inodeno_t ino;
+         map<client_t,Capability::Export> cap_exports;
+         bufferlist::iterator q = p->second.begin();
+         ::decode(ino, q);
+         ::decode(cap_exports, q);
+
+         assert(get_inode(ino));
+
+         for (map<client_t,Capability::Export>::iterator q = cap_exports.begin();
+             q != cap_exports.end();
+             ++q) {
+           Capability::Import& im = rejoin_imported_caps[from][ino][q->first];
+           im.cap_id = ++last_cap_id; // assign a new cap ID
+           im.issue_seq = 1;
+           im.mseq = q->second.mseq;
+         }
+
+         // will process these caps in rejoin stage
+         rejoin_slave_exports[ino].first = from;
+         rejoin_slave_exports[ino].second.swap(cap_exports);
+
+         // send information of imported caps back to slave
+         ::encode(rejoin_imported_caps[from][ino], ack->commit[p->first]);
+       }
       } else {
        // ABORT
        dout(10) << " ambiguous slave request " << *p << " will ABORT" << dendl;
-       ack->add_abort(*p);      
+       ack->add_abort(p->first);
       }
     }
     mds->send_message(ack, m->get_connection());
@@ -3037,30 +3077,34 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack)
     assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
   }
 
-  for (vector<metareqid_t>::iterator p = ack->commit.begin();
+  for (map<metareqid_t, bufferlist>::iterator p = ack->commit.begin();
        p != ack->commit.end();
        ++p) {
-    dout(10) << " commit on slave " << *p << dendl;
+    dout(10) << " commit on slave " << p->first << dendl;
     
     if (ambiguous_slave_updates.count(from)) {
-      remove_ambiguous_slave_update(*p, from);
+      remove_ambiguous_slave_update(p->first, from);
       continue;
     }
 
     if (mds->is_resolve()) {
       // replay
-      MDSlaveUpdate *su = get_uncommitted_slave_update(*p, from);
+      MDSlaveUpdate *su = get_uncommitted_slave_update(p->first, from);
       assert(su);
 
       // log commit
-      mds->mdlog->start_submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", *p, from,
+      mds->mdlog->start_submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", p->first, from,
                                                      ESlaveUpdate::OP_COMMIT, su->origop));
-      mds->mdlog->wait_for_safe(new C_MDC_SlaveCommit(this, from, *p));
+      mds->mdlog->wait_for_safe(new C_MDC_SlaveCommit(this, from, p->first));
       mds->mdlog->flush();
 
-      finish_uncommitted_slave_update(*p, from);
+      finish_uncommitted_slave_update(p->first, from);
     } else {
-      MDRequest *mdr = request_get(*p);
+      MDRequest *mdr = request_get(p->first);
+      // information about master imported caps
+      if (p->second.length() > 0)
+       mdr->more()->inode_import.claim(p->second);
+
       assert(mdr->slave_request == 0);  // shouldn't be doing anything!
       request_finish(mdr);
     }
@@ -5000,8 +5044,9 @@ bool MDCache::process_imported_caps()
 {
   dout(10) << "process_imported_caps" << dendl;
 
-  map<inodeno_t,map<client_t, map<int,ceph_mds_cap_reconnect> > >::iterator p;
-  for (p = cap_imports.begin(); p != cap_imports.end(); ++p) {
+  for (map<inodeno_t,map<client_t, map<int,ceph_mds_cap_reconnect> > >::iterator p = cap_imports.begin();
+       p != cap_imports.end();
+       ++p) {
     CInode *in = get_inode(p->first);
     if (in) {
       assert(in->is_auth());
@@ -5021,10 +5066,38 @@ bool MDCache::process_imported_caps()
 
   // called by rejoin_gather_finish() ?
   if (rejoin_gather.count(mds->get_nodeid()) == 0) {
+    // process caps that were exported by slave rename
+    for (map<inodeno_t,pair<int,map<client_t,Capability::Export> > >::iterator p = rejoin_slave_exports.begin();
+        p != rejoin_slave_exports.end();
+        ++p) {
+      CInode *in = get_inode(p->first);
+      for (map<client_t,Capability::Export>::iterator q = p->second.second.begin();
+          q != p->second.second.end();
+          ++q) {
+       Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
+       assert(session);
+
+       Capability *cap = in->get_client_cap(q->first);
+       if (!cap)
+         cap = in->add_client_cap(q->first, session);
+       cap->merge(q->second, true);
+
+       Capability::Import& im = rejoin_imported_caps[p->second.first][p->first][q->first];
+       assert(cap->get_last_seq() == im.issue_seq);
+       assert(cap->get_mseq() == im.mseq);
+       cap->set_cap_id(im.cap_id);
+       // send cap import because we assigned a new cap ID
+       do_cap_import(session, in, cap, q->second.cap_id, q->second.seq, q->second.mseq - 1,
+                     p->second.first, CEPH_CAP_FLAG_AUTH);
+      }
+    }
+    rejoin_slave_exports.clear();
+    rejoin_imported_caps.clear();
+
     // process cap imports
     //  ino -> client -> frommds -> capex
-    p = cap_imports.begin();
-    while (p != cap_imports.end()) {
+    for (map<inodeno_t,map<client_t, map<int,ceph_mds_cap_reconnect> > >::iterator p = cap_imports.begin();
+        p != cap_imports.end(); ) {
       CInode *in = get_inode(p->first);
       if (!in) {
        dout(10) << " still missing ino " << p->first
index 0aa63c771b04dbe2212f7edcd08e1cb4fced43de..c61fe394c4f3c00ba4764a29df2935e98f3b7890 100644 (file)
@@ -412,6 +412,7 @@ protected:
   set<int> rejoin_sent;        // nodes i sent a rejoin to
   set<int> rejoin_ack_gather;  // nodes from whom i need a rejoin ack
   map<int,map<inodeno_t,map<client_t,Capability::Import> > > rejoin_imported_caps;
+  map<inodeno_t,pair<int,map<client_t,Capability::Export> > > rejoin_slave_exports;
 
   map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> > cap_exports; // ino -> client -> capex
   map<inodeno_t,int> cap_export_targets; // ino -> auth mds
index 551ca991a479e24c340da302b3dece0a541dc62c..263bd54750d4f6b8b3e34f38839da75c12a99ab2 100644 (file)
@@ -23,7 +23,7 @@ class MMDSResolve : public Message {
  public:
   map<dirfrag_t, vector<dirfrag_t> > subtrees;
   map<dirfrag_t, vector<dirfrag_t> > ambiguous_imports;
-  vector<metareqid_t> slave_requests;
+  map<metareqid_t, bufferlist> slave_requests;
 
   MMDSResolve() : Message(MSG_MDS_RESOLVE) {}
 private:
@@ -50,7 +50,11 @@ public:
   }
 
   void add_slave_request(metareqid_t reqid) {
-    slave_requests.push_back(reqid);
+    slave_requests[reqid].clear();
+  }
+
+  void add_slave_request(metareqid_t reqid, bufferlist& bl) {
+    slave_requests[reqid].claim(bl);
   }
 
   void encode_payload(uint64_t features) {
index 2118abaaf49952b2c536131531cace270ce1bb44..743353fef95657ceb65e5c9b44689e13a2520c30 100644 (file)
@@ -22,7 +22,7 @@
 
 class MMDSResolveAck : public Message {
  public:
-  vector<metareqid_t> commit;
+  map<metareqid_t, bufferlist> commit;
   vector<metareqid_t> abort;
 
   MMDSResolveAck() : Message(MSG_MDS_RESOLVEACK) {}
@@ -39,7 +39,7 @@ public:
   */
   
   void add_commit(metareqid_t r) {
-    commit.push_back(r);
+    commit[r].clear();
   }
   void add_abort(metareqid_t r) {
     abort.push_back(r);