]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: include counterpart's information in cap import/export messages
authorYan, Zheng <zheng.z.yan@intel.com>
Tue, 26 Nov 2013 03:02:49 +0000 (11:02 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Mon, 16 Dec 2013 04:15:25 +0000 (12:15 +0800)
when exporting indoes with client caps, the importer sends cap import
messages to clients, the exporter sends cap export messages to clients.
A client can receive these two messages in any order. If a client first
receives cap import message, it adds the imported caps. but the caps
from the exporter are still considered as valid. This can compromise
consistence. If MDS crashes while importing caps, clients can only
receive cap export messages, but don't receive cap import messages.
These clients don't know which MDS is the cap importer, so they can't
send cap reconnect when the MDS recovers.

We can handle above issues by including counterpart's information in
cap import/export messages. If a client first receives cap import
message, it added the imported caps, then removes the the exporter's
caps. If a client first receives cap export message, it removes the
exported caps, then adds caps for the importer.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/include/ceph_fs.h
src/include/types.h
src/mds/CInode.cc
src/mds/Capability.h
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/Migrator.cc
src/mds/Migrator.h
src/mds/Server.cc
src/messages/MClientCaps.h

index f1f286faf71d33f36217acbb87b9b1b08aa23250..dabc40a9d84dbb0671922c21cb3cf23720eb78fd 100644 (file)
@@ -665,6 +665,15 @@ struct ceph_mds_caps {
        __le32 time_warp_seq;
 } __attribute__ ((packed));
 
+/* extra info for cap import/export */
+struct ceph_mds_cap_peer {
+       __le64 cap_id;
+       __le32 seq;
+       __le32 mseq;
+       __le32 mds;
+       __u8   flags;
+} __attribute__ ((packed));
+
 /* cap release msg head */
 struct ceph_mds_cap_release {
        __le32 num;                /* number of cap_items that follow */
index 5a9e6f6d4c9dd2a34f55a36b9ad50fc36583f901..33304521f90967a553c22fbe9366d39594918464 100644 (file)
@@ -242,6 +242,7 @@ WRITE_RAW_ENCODER(ceph_mds_request_head)
 WRITE_RAW_ENCODER(ceph_mds_request_release)
 WRITE_RAW_ENCODER(ceph_filelock)
 WRITE_RAW_ENCODER(ceph_mds_caps)
+WRITE_RAW_ENCODER(ceph_mds_cap_peer)
 WRITE_RAW_ENCODER(ceph_mds_cap_release)
 WRITE_RAW_ENCODER(ceph_mds_cap_item)
 WRITE_RAW_ENCODER(ceph_mds_lease)
index 9c9360a0abf2f6bce7ced4ddb7e1e3598fb20361..d5d5a19a4352ee58a0928b93c878179058afc56f 100644 (file)
@@ -2555,8 +2555,8 @@ Capability *CInode::reconnect_cap(client_t client, ceph_mds_cap_reconnect& icr,
     cap->set_wanted(icr.wanted);
     cap->issue_norevoke(icr.issued);
     cap->reset_seq();
+    cap->set_cap_id(icr.cap_id);
   }
-  cap->set_cap_id(icr.cap_id);
   cap->set_last_issue_stamp(ceph_clock_now(g_ceph_context));
   return cap;
 }
index 6f98aec05de09ceb1043fa684debde9f3b5ee0a9..c04a05285cf335750b20e9c07c062c414443a3a5 100644 (file)
@@ -246,6 +246,7 @@ public:
   }
   
   ceph_seq_t get_mseq() { return mseq; }
+  void inc_mseq() { mseq++; }
 
   ceph_seq_t get_last_sent() { return last_sent; }
   utime_t get_last_issue_stamp() { return last_issue_stamp; }
@@ -288,7 +289,6 @@ public:
   Export make_export() {
     return Export(cap_id, _wanted, issued(), pending(), client_follows, last_sent, mseq+1, last_issue_stamp);
   }
-  void rejoin_import() { mseq++; }
   void merge(Export& other, bool auth_cap) {
     // issued + pending
     int newpending = other.pending | pending();
index 21289a1b334621b46f13d5eef7e5b91352f08cb8..811f31b0b952651ab0f32a0d591e0973ee21e6a8 100644 (file)
@@ -4759,6 +4759,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
       // mark client caps stale.
       MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0,
                                       cap_exports[p->first][q->first].cap_id, 0);
+      m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, from, 0);
       mds->send_message_client_counted(m, session);
 
       cap_exports[p->first].erase(q->first);
@@ -5043,7 +5044,9 @@ bool MDCache::process_imported_caps()
          add_reconnected_cap(in, q->first, inodeno_t(r->second.snaprealm));
          Capability *cap = in->reconnect_cap(q->first, r->second, session);
          if (r->first >= 0) {
-           do_cap_import(session, in, cap);
+           if (cap->get_last_seq() == 0) // don't increase mseq if cap already exists
+             cap->inc_mseq();
+           do_cap_import(session, in, cap, r->second.cap_id, 0, 0, r->first, 0);
 
            Capability::Import& im = rejoin_imported_caps[r->first][p->first][q->first];
            im.cap_id = cap->get_cap_id();
@@ -5216,8 +5219,9 @@ Capability* MDCache::rejoin_import_cap(CInode *in, client_t client, ceph_mds_cap
   Capability *cap = in->reconnect_cap(client, icr, session);
 
   if (frommds >= 0) {
-    cap->rejoin_import();
-    do_cap_import(session, in, cap);
+    if (cap->get_last_seq() == 0) // don't increase mseq if cap already exists
+      cap->inc_mseq();
+    do_cap_import(session, in, cap, icr.cap_id, 0, 0, frommds, 0);
   }
 
   return cap;
@@ -5240,6 +5244,7 @@ void MDCache::export_remaining_imported_caps()
       if (session) {
        // mark client caps stale.
        MClientCaps *stale = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0, 0, 0);
+       stale->set_cap_peer(0, 0, 0, -1, 0);
        mds->send_message_client_counted(stale, q->first);
       }
     }
@@ -5279,7 +5284,9 @@ void MDCache::try_reconnect_cap(CInode *in, Session *session)
 // -------
 // cap imports and delayed snap parent opens
 
-void MDCache::do_cap_import(Session *session, CInode *in, Capability *cap)
+void MDCache::do_cap_import(Session *session, CInode *in, Capability *cap,
+                           uint64_t p_cap_id, ceph_seq_t p_seq, ceph_seq_t p_mseq,
+                           int peer, int p_flags)
 {
   client_t client = session->info.inst.name.num();
   SnapRealm *realm = in->find_snaprealm();
@@ -5296,6 +5303,7 @@ void MDCache::do_cap_import(Session *session, CInode *in, Capability *cap)
                                        cap->get_mseq());
     in->encode_cap_message(reap, cap);
     realm->build_snap_trace(reap->snapbl);
+    reap->set_cap_peer(p_cap_id, p_seq, p_mseq, peer, p_flags);
     mds->send_message_client_counted(reap, session);
   } else {
     dout(10) << "do_cap_import missing past snap parents, delaying " << session->info.inst.name << " mseq "
@@ -5311,6 +5319,8 @@ void MDCache::do_delayed_cap_imports()
 {
   dout(10) << "do_delayed_cap_imports" << dendl;
 
+  assert(delayed_imported_caps.empty());
+#if 0
   map<client_t,set<CInode*> > d;
   d.swap(delayed_imported_caps);
 
@@ -5335,6 +5345,7 @@ void MDCache::do_delayed_cap_imports()
        mds->locker->issue_caps(in);
     }
   }    
+#endif
 }
 
 struct C_MDC_OpenSnapParents : public Context {
index fd66a32a03b4217167789ac7e99f9f100359d968..0aa63c771b04dbe2212f7edcd08e1cb4fced43de 100644 (file)
@@ -505,7 +505,9 @@ public:
   map<CInode*,map<client_t, set<inodeno_t> > > missing_snap_parents; 
   map<client_t,set<CInode*> > delayed_imported_caps;
 
-  void do_cap_import(Session *session, CInode *in, Capability *cap);
+  void do_cap_import(Session *session, CInode *in, Capability *cap,
+                    uint64_t p_cap_id, ceph_seq_t p_seq, ceph_seq_t p_mseq,
+                    int peer, int p_flags);
   void do_delayed_cap_imports();
   void check_realm_past_parents(SnapRealm *realm);
   void open_snap_parents();
index 6fe4c785b382ae739a1244809b598dc088d6514c..30b037cc505ebc355f802ba2a41d092203b1629e 100644 (file)
@@ -1186,7 +1186,8 @@ void Migrator::encode_export_inode_caps(CInode *in, bufferlist& bl,
     exported_client_map[it->first] = mds->sessionmap.get_inst(entity_name_t::CLIENT(it->first.v));
 }
 
-void Migrator::finish_export_inode_caps(CInode *in)
+void Migrator::finish_export_inode_caps(CInode *in, int peer,
+                                       map<client_t,Capability::Import>& peer_imported)
 {
   dout(20) << "finish_export_inode_caps " << *in << dendl;
 
@@ -1198,21 +1199,23 @@ void Migrator::finish_export_inode_caps(CInode *in)
        it != in->client_caps.end();
        ++it) {
     Capability *cap = it->second;
-    dout(7) << "finish_export_inode telling client." << it->first
+    dout(7) << "finish_export_inode_caps telling client." << it->first
            << " exported caps on " << *in << dendl;
-    MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT,
-                                    in->ino(),
-                                    in->find_snaprealm()->inode->ino(),
-                                    cap->get_cap_id(), cap->get_last_seq(), 
-                                    cap->pending(), cap->wanted(), 0,
-                                    cap->get_mseq());
+    MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, in->ino(), 0,
+                                    cap->get_cap_id(), cap->get_mseq());
+
+    map<client_t,Capability::Import>::iterator q = peer_imported.find(it->first);
+    assert(q != peer_imported.end());
+    m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, peer, 0);
     mds->send_message_client_counted(m, it->first);
   }
   in->clear_client_caps_after_export();
   mds->locker->eval(in, CEPH_CAP_LOCKS);
 }
 
-void Migrator::finish_export_inode(CInode *in, utime_t now, list<Context*>& finished)
+void Migrator::finish_export_inode(CInode *in, utime_t now, int peer,
+                                  map<client_t,Capability::Import>& peer_imported,
+                                  list<Context*>& finished)
 {
   dout(12) << "finish_export_inode " << *in << dendl;
 
@@ -1254,7 +1257,7 @@ void Migrator::finish_export_inode(CInode *in, utime_t now, list<Context*>& fini
 
   in->finish_export(now);
   
-  finish_export_inode_caps(in);
+  finish_export_inode_caps(in, peer, peer_imported);
 
   // *** other state too?
 
@@ -1355,7 +1358,9 @@ int Migrator::encode_export_dir(bufferlist& exportbl,
   return num_exported;
 }
 
-void Migrator::finish_export_dir(CDir *dir, list<Context*>& finished, utime_t now)
+void Migrator::finish_export_dir(CDir *dir, utime_t now, int peer,
+                                map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
+                                list<Context*>& finished)
 {
   dout(10) << "finish_export_dir " << *dir << dendl;
 
@@ -1389,7 +1394,7 @@ void Migrator::finish_export_dir(CDir *dir, list<Context*>& finished, utime_t no
 
     // inode?
     if (dn->get_linkage()->is_primary()) {
-      finish_export_inode(in, now, finished);
+      finish_export_inode(in, now, peer, peer_imported[in->ino()], finished);
 
       // subdirs?
       in->get_nested_dirfrags(subdirs);
@@ -1398,7 +1403,7 @@ void Migrator::finish_export_dir(CDir *dir, list<Context*>& finished, utime_t no
 
   // subdirs
   for (list<CDir*>::iterator it = subdirs.begin(); it != subdirs.end(); ++it) 
-    finish_export_dir(*it, finished, now);
+    finish_export_dir(*it, now, peer, peer_imported, finished);
 }
 
 class C_MDS_ExportFinishLogged : public Context {
@@ -1665,7 +1670,8 @@ void Migrator::export_finish(CDir *dir)
   
   // finish export (adjust local cache state)
   C_Contexts *fin = new C_Contexts(g_ceph_context);
-  finish_export_dir(dir, fin->contexts, ceph_clock_now(g_ceph_context));
+  finish_export_dir(dir, ceph_clock_now(g_ceph_context),
+                   it->second.peer, it->second.peer_imported, fin->contexts);
   dir->add_waiter(CDir::WAIT_UNFREEZE, fin);
 
   // unfreeze
@@ -2382,7 +2388,8 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, int from,
   for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
        p != it->second.peer_exports.end();
        ++p) {
-    finish_import_inode_caps(p->first, true, p->second, imported_caps[p->first->ino()]);
+
+    finish_import_inode_caps(p->first, from, true, p->second, imported_caps[p->first->ino()]);
   }
   
   // send notify's etc.
@@ -2561,7 +2568,7 @@ void Migrator::decode_import_inode_caps(CInode *in,
   }
 }
 
-void Migrator::finish_import_inode_caps(CInode *in, bool auth_cap,
+void Migrator::finish_import_inode_caps(CInode *in, int peer, bool auth_cap,
                                        map<client_t,Capability::Export> &export_map,
                                        map<client_t,Capability::Import> &import_map)
 {
@@ -2583,7 +2590,9 @@ void Migrator::finish_import_inode_caps(CInode *in, bool auth_cap,
     im.issue_seq = cap->get_last_seq() + 1;
 
     cap->merge(it->second, auth_cap);
-    mds->mdcache->do_cap_import(session, in, cap);
+    mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id,
+                               it->second.seq, it->second.mseq - 1, peer,
+                               auth_cap ? CEPH_CAP_FLAG_AUTH : 0);
   }
 
   in->replica_caps_wanted = 0;
@@ -2863,7 +2872,7 @@ void Migrator::logged_import_caps(CInode *in,
   map<client_t,Capability::Import> imported_caps;
 
   assert(peer_exports.count(in));
-  finish_import_inode_caps(in, false, peer_exports[in], imported_caps);
+  finish_import_inode_caps(in, from, false, peer_exports[in], imported_caps);
   mds->locker->eval(in, CEPH_CAP_LOCKS, true);
 
   mds->send_message_mds(new MExportCapsAck(in->ino()), from);
index 10d54511388ba9e9324d9bc76f18f0d11697aa90..58dce1cb67a228dbdf494c0ece27692602a0354a 100644 (file)
@@ -238,14 +238,20 @@ public:
                           map<client_t,entity_inst_t>& exported_client_map);
   void encode_export_inode_caps(CInode *in, bufferlist& bl,
                                map<client_t,entity_inst_t>& exported_client_map);
-  void finish_export_inode(CInode *in, utime_t now, list<Context*>& finished);
-  void finish_export_inode_caps(CInode *in);
+  void finish_export_inode(CInode *in, utime_t now, int target,
+                          map<client_t,Capability::Import>& peer_imported,
+                          list<Context*>& finished);
+  void finish_export_inode_caps(CInode *in, int target,
+                               map<client_t,Capability::Import>& peer_imported);
+
 
   int encode_export_dir(bufferlist& exportbl,
                        CDir *dir,
                        map<client_t,entity_inst_t>& exported_client_map,
                        utime_t now);
-  void finish_export_dir(CDir *dir, list<Context*>& finished, utime_t now);
+  void finish_export_dir(CDir *dir, utime_t now, int target,
+                        map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
+                        list<Context*>& finished);
 
   void add_export_finish_waiter(CDir *dir, Context *c) {
     map<CDir*, export_state_t>::iterator it = export_state.find(dir);
@@ -299,7 +305,7 @@ public:
   void decode_import_inode_caps(CInode *in,
                                bufferlist::iterator &blp,
                                map<CInode*, map<client_t,Capability::Export> >& cap_imports);
-  void finish_import_inode_caps(CInode *in, bool auth_cap,
+  void finish_import_inode_caps(CInode *in, int from, bool auth_cap,
                                map<client_t,Capability::Export> &export_map,
                                map<client_t,Capability::Import> &import_map);
   int decode_import_dir(bufferlist::iterator& blp,
index 1d249403ddb7344739de831ae75feef307759a13..8242d226ff57facc5922a31b4fcd634ac29fbc8e 100644 (file)
@@ -6340,7 +6340,7 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen
       finish_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map);
       if (mdr->more()->cap_imports.count(destdnl->get_inode())) {
        mds->mdcache->migrator->finish_import_inode_caps(destdnl->get_inode(),
-                                                        mdr->more()->srcdn_auth_mds,
+                                                        mdr->more()->srcdn_auth_mds, true,
                                                         mdr->more()->cap_imports[destdnl->get_inode()],
                                                         imported_caps);
       }
@@ -6734,7 +6734,8 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r,
       ::decode(peer_imported, bp);
 
       dout(10) << " finishing inode export on " << *destdnl->get_inode() << dendl;
-      mdcache->migrator->finish_export_inode(destdnl->get_inode(), mdr->now, finished);
+      mdcache->migrator->finish_export_inode(destdnl->get_inode(), mdr->now,
+                                            mdr->slave_to_mds, peer_imported, finished);
       mds->queue_waiters(finished);   // this includes SINGLEAUTH waiters.
 
       // unfreeze
index 117f24162e7d6b8a153dbfeade522b0d6e6eb6bb..04a393d2c7f1e6444c29d964bae74c789f48f519 100644 (file)
 
 class MClientCaps : public Message {
 
-  static const int HEAD_VERSION = 2;   // added flock metadata
+  static const int HEAD_VERSION = 3;   // added flock metadata
   static const int COMPAT_VERSION = 1;
 
  public:
   struct ceph_mds_caps head;
+  struct ceph_mds_cap_peer peer;
   bufferlist snapbl;
   bufferlist xattrbl;
   bufferlist flockbl;
@@ -73,6 +74,14 @@ class MClientCaps : public Message {
   void set_mtime(const utime_t &t) { t.encode_timeval(&head.mtime); }
   void set_atime(const utime_t &t) { t.encode_timeval(&head.atime); }
 
+  void set_cap_peer(uint64_t id, ceph_seq_t seq, ceph_seq_t mseq, int mds, int flags) {
+    peer.cap_id = id;
+    peer.seq = seq;
+    peer.mseq = mseq;
+    peer.mds = mds;
+    peer.flags = flags;
+  }
+
   MClientCaps()
     : Message(CEPH_MSG_CLIENT_CAPS, HEAD_VERSION, COMPAT_VERSION) { }
   MClientCaps(int op,
@@ -95,6 +104,7 @@ class MClientCaps : public Message {
     head.wanted = wanted;
     head.dirty = dirty;
     head.migrate_seq = mseq;
+    peer.cap_id = 0;
   }
   MClientCaps(int op,
              inodeno_t ino, inodeno_t realm,
@@ -106,6 +116,7 @@ class MClientCaps : public Message {
     head.realm = realm;
     head.cap_id = id;
     head.migrate_seq = mseq;
+    peer.cap_id = 0;
   }
 private:
   ~MClientCaps() {}
@@ -151,10 +162,22 @@ public:
     // conditionally decode flock metadata
     if (header.version >= 2)
       ::decode(flockbl, p);
+
+    if (header.version >= 3) {
+      if (head.op == CEPH_CAP_OP_IMPORT)
+       ::decode(peer, p);
+      else if (head.op == CEPH_CAP_OP_EXPORT)
+       memcpy(&peer, &head.size, sizeof(peer));
+    }
   }
   void encode_payload(uint64_t features) {
     head.snap_trace_len = snapbl.length();
     head.xattr_len = xattrbl.length();
+
+    // record peer in unused fields of cap export message
+    if (head.op == CEPH_CAP_OP_EXPORT)
+      memcpy(&head.size, &peer, sizeof(peer));
+
     ::encode(head, payload);
     ::encode_nohead(snapbl, payload);
 
@@ -164,7 +187,16 @@ public:
     if (features & CEPH_FEATURE_FLOCK) {
       ::encode(flockbl, payload);
     } else {
-      header.version = 1;  // old
+      header.version = 1;
+      return;
+    }
+
+    if (true) {
+      if (head.op == CEPH_CAP_OP_IMPORT)
+       ::encode(peer, payload);
+    } else {
+      header.version = 2;
+      return;
     }
   }
 };