]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: simplify caps import/export behavior
authorSage Weil <sage@newdream.net>
Thu, 5 Jun 2008 18:53:18 +0000 (11:53 -0700)
committerSage Weil <sage@newdream.net>
Thu, 5 Jun 2008 18:53:18 +0000 (11:53 -0700)
src/client/Client.cc
src/client/Client.h
src/include/ceph_fs.h
src/mds/Capability.h
src/mds/Locker.cc
src/mds/MDCache.cc
src/mds/Migrator.cc
src/mds/Server.cc
src/messages/MClientFileCaps.h
src/messages/MClientReply.h
src/vstartnew.sh

index 5789e9f55798f3ae276e12a75799636cc18166d5..3a1a4cc0b5912f41eebc4a330487918bf82165c8 100644 (file)
@@ -1183,27 +1183,28 @@ void Client::send_reconnect(int mds)
     for (hash_map<inodeno_t, Inode*>::iterator p = inode_map.begin();
         p != inode_map.end();
         p++) {
-      if (p->second->caps.count(mds)) {
+      Inode *in = p->second;
+      if (in->caps.count(mds)) {
        dout(10) << " caps on " << p->first
-                << " " << cap_string(p->second->caps[mds].issued)
-                << " wants " << cap_string(p->second->caps_wanted())
+                << " " << cap_string(in->caps[mds].issued)
+                << " wants " << cap_string(in->caps_wanted())
                 << dendl;
-       p->second->caps[mds].seq = 0;  // reset seq.
+       in->caps[mds].seq = 0;  // reset seq.
        m->add_inode_caps(p->first,    // ino
-                         p->second->caps_wanted(), // wanted
-                         p->second->caps[mds].issued,     // issued
-                         p->second->inode.size, p->second->inode.mtime, p->second->inode.atime);
+                         in->caps_wanted(), // wanted
+                         in->caps[mds].issued,     // issued
+                         in->inode.size, in->inode.mtime, in->inode.atime);
        filepath path;
-       p->second->make_path(path);
+       in->make_path(path);
        dout(10) << " path on " << p->first << " is " << path << dendl;
        m->add_inode_path(p->first, path.get_path());
       }
-      /*
-      if (p->second->stale_caps.count(mds)) {
-       dout(10) << " clearing stale caps on " << p->first << dendl;
-       p->second->stale_caps.erase(mds);         // hrm, is this right?
+      if (in->exporting_mds == mds) {
+       dout(10) << " clearing exporting_caps on " << p->first << dendl;
+       in->exporting_mds = -1;
+       in->exporting_issued = 0;
+       in->exporting_mseq = 0;
       }
-      */
     }
 
     // reset my cap seq number
@@ -1408,22 +1409,19 @@ void Client::check_caps(Inode *in)
                                             in->inode, 
                                              it->second.seq,
                                              it->second.issued,
-                                             wanted);
+                                             wanted,
+                                            0);
     in->reported_size = in->inode.size;
     m->set_max_size(in->wanted_max_size);
     in->requested_max_size = in->wanted_max_size;
     messenger->send_message(m, mdsmap->get_inst(it->first));
-    if (wanted == 0) {
-      it->second.seq = 0;
-      if (it->second.can_drop()) {
-       mds_sessions[it->first].num_caps--;
-       in->caps.erase(it);
-      }
-    }
+    if (wanted == 0)
+      mds_sessions[it->first].num_caps--;
   }
 
-  if (in->caps.empty()) {
+  if (wanted == 0) {
     dout(10) << "last caps on " << *in << dendl;
+    in->caps.clear();
     put_inode(in);
   }
 }
@@ -1530,74 +1528,85 @@ void Client::handle_file_caps(MClientFileCaps *m)
     return;
   }
   
-  // reap?
   if (m->get_op() == CEPH_CAP_OP_IMPORT) {
-    int other = m->get_migrate_mds();
-
-    /*
-     * FIXME: handle mds failures
-     */
-
     // fresh from new mds?
     if (!in->caps.count(mds)) {
       mds_sessions[mds].num_caps++;
-      if (in->caps.empty()) in->get();
-      in->caps[mds].seq = m->get_seq();
-      in->caps[mds].issued = m->get_caps();
-    }
-
-    if (in->caps.count(other) &&
-       in->caps[other].issued_exporting) {
-      dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " IMPORT to mds" << mds << " from mds" << other
-             << " - already saw EXPORT, done" << dendl;
-      in->caps[other].issued_exporting = 0;
-      if (in->caps[other].can_drop()) {
-       mds_sessions[other].num_caps--;
-       in->caps.erase(other);
+      if (in->caps.empty())
+       in->get();
+      if (in->exporting_mds == mds) {
+       dout(10) << " clearing exporting_caps on " << mds << dendl;
+       in->exporting_mds = -1;
+       in->exporting_issued = 0;
+       in->exporting_mseq = 0;
       }
+    }
+    in->caps[mds].seq = m->get_seq();
+    in->caps[mds].issued = m->get_caps();
+    in->caps[mds].mseq = m->get_mseq();
+
+    if (in->exporting_mseq < m->get_mseq()) {
+      dout(5) << "handle_file_caps ino " << m->get_ino() << " mseq " << m->get_mseq()
+             << " IMPORT from mds" << mds << ", clearing exporting_issued " << cap_string(in->exporting_issued) 
+             << " mseq " << in->exporting_mseq << dendl;
+      in->exporting_issued = 0;
+      in->exporting_mseq = 0;
+      in->exporting_mds = -1;
     } else {
-      dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " IMPORT to mds" << mds << " from mds" << other
-             << " - no EXPORT yet, marking" << dendl;
-      in->caps[mds].importing_from.insert(other);
+      dout(5) << "handle_file_caps ino " << m->get_ino() << " mseq " << m->get_mseq()
+             << " IMPORT from mds" << mds << ", keeping exporting_issued " << cap_string(in->exporting_issued) 
+             << " mseq " << in->exporting_mseq << " by mds" << in->exporting_mds << dendl;
     }
-      
-    // fall-thru!
-    // (wake ppl up, etc.)
+    delete m;
+    return;
+  }
+
+  // don't have cap?   
+  //   (it may be that we've reopened the file locally,
+  //    and thus want caps, but don't have a cap yet.
+  //    we should never reply to a cap out of turn.)
+  if (in->caps.count(mds) == 0) {
+    // silently drop.
+    dout(5) << "handle_file_caps on ino " << m->get_ino() 
+            << " seq " << m->get_seq() 
+            << " " << cap_string(m->get_caps()) 
+            << ", don't have this cap, silently dropping." << dendl;
+    delete m;
+    return;
   }
+  
+  // ok!
+  InodeCap &cap = in->caps[mds];
+
 
   // stale?
   if (m->get_op() == CEPH_CAP_OP_EXPORT) {
-    // move to stale list
-    bool found_importing = false;
+    // note
+    bool found_higher_mseq = false;
     for (map<int,InodeCap>::iterator p = in->caps.begin();
         p != in->caps.end();
         p++) {
       if (p->first == mds) continue;
-      if (p->second.importing_from.count(mds) == 0) continue;
-
-      dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " EXPORT from mds" << mds
-             << " - already saw IMPORT, done" << dendl;
-      p->second.importing_from.erase(mds);
-      mds_sessions[mds].num_caps--;
-      in->caps.erase(mds);
-      found_importing = true;
-      if (p->second.can_drop()) {
-       mds_sessions[p->first].num_caps--;
-       in->caps.erase(p);
-       if (in->caps.empty())
-         put_inode(in);
+      if (p->second.mseq > m->get_mseq()) {
+       found_higher_mseq = true;
+       dout(5) << "handle_file_caps ino " << m->get_ino() << " mseq " << m->get_mseq() 
+               << " EXPORT from mds" << mds
+               << ", but mds" << p->first << " has higher mseq " << p->second.mseq << dendl;
+       break;
       }
-      break;
-    } 
-    if (!found_importing) {
-      dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " EXPORT from mds" << mds
-             << " - no IMPORT yet, marking" << dendl;
-      if (in->caps.empty()) in->get();
-      InodeCap &cap = in->caps[mds];
-      cap.issued_exporting = in->caps[mds].issued;
-      cap.issued = 0;
-      cap.seq = 0;
     }
+    if (!found_higher_mseq) {
+      dout(5) << "handle_file_caps ino " << m->get_ino() << " mseq " << m->get_mseq() 
+             << " EXPORT from mds" << mds
+             << ", setting exporting_issued " << cap_string(cap.issued) << dendl;
+      in->exporting_issued = cap.issued;
+      in->exporting_mseq = m->get_mseq();
+      in->exporting_mds = mds;
+      in->caps.erase(mds);
+      if (in->caps.empty())
+       put_inode(in);
+    }
+
     delete m;
     return;
   }
@@ -1623,23 +1632,6 @@ void Client::handle_file_caps(MClientFileCaps *m)
     return;
   }
 
-  // don't have cap?   
-  //   (it may be that we've reopened the file locally,
-  //    and thus want caps, but don't have a cap yet.
-  //    we should never reply to a cap out of turn.)
-  if (in->caps.count(mds) == 0) {
-    // silently drop.
-    dout(5) << "handle_file_caps on ino " << m->get_ino() 
-            << " seq " << m->get_seq() 
-            << " " << cap_string(m->get_caps()) 
-            << ", don't have this cap, silently dropping." << dendl;
-    delete m;
-    return;
-  }
-  
-
-  // ok!
-  InodeCap &cap = in->caps[mds];
   cap.seq = m->get_seq();
 
   // don't want it?
@@ -1653,13 +1645,9 @@ void Client::handle_file_caps(MClientFileCaps *m)
     m->set_caps(0);
     m->set_wanted(0);
     messenger->send_message(m, m->get_source_inst());
-    cap.seq = 0;
-    if (cap.can_drop()) {
-      mds_sessions[mds].num_caps--;
-      in->caps.erase(mds);
-      if (in->caps.empty())
-       put_inode(in);
-    }      
+
+    // FIXME...
+
     return;
   }
 
@@ -2891,8 +2879,15 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui
       dout(7) << " first caps on " << in->inode.ino << dendl;
       in->get();
     }
-    if (in->caps.count(mds) == 0)
+    if (in->caps.count(mds) == 0) {
       mds_sessions[mds].num_caps++;
+      if (in->exporting_mds == mds) {
+       dout(10) << " clearing exporting_caps on " << mds << dendl;
+       in->exporting_mds = -1;
+       in->exporting_issued = 0;
+       in->exporting_mseq = 0;
+      }
+    }
     InodeCap &cap = in->caps[mds];
 
     int new_caps = reply->get_file_caps();
@@ -2911,6 +2906,7 @@ int Client::_open(const filepath &path, int flags, mode_t mode, Fh **fhp, int ui
       cap.issued |= new_caps;
       cap.implemented |= new_caps;
       cap.seq = reply->get_file_caps_seq();
+      cap.mseq = reply->get_file_caps_mseq();
 
       // any caps?
       if (new_caps & ~old_caps)
index d48e4ad01a79f8e28e4a663fef6e129d5f820de2..59f19c3c00c8b5f4e7cc116caf720a24d99779e0 100644 (file)
@@ -129,19 +129,10 @@ class InodeCap {
  public:
   unsigned issued;
   unsigned implemented;
-  unsigned seq;
+  __u64 seq;
+  __u32 mseq;  // migration seq
 
-  int issued_exporting;        // if export comes first
-  set<int> importing_from;    // if import comes first
-
-  InodeCap() : issued(0), implemented(0), seq(0),
-              issued_exporting(0) {}
-  
-  bool can_drop() {
-    return issued == 0 &&
-      issued_exporting == 0 &&
-      importing_from.empty();
-  }
+  InodeCap() : issued(0), implemented(0), seq(0), mseq(0) {}
 };
 
 
@@ -158,6 +149,9 @@ class Inode {
 
   // per-mds caps
   map<int,InodeCap> caps;            // mds -> InodeCap
+  unsigned exporting_issued;
+  int exporting_mds;
+  capseq_t exporting_mseq;
 
   //int open_by_mode[CEPH_FILE_MODE_NUM];
   map<int,int> open_by_mode;
@@ -211,6 +205,7 @@ class Inode {
     //inode(_inode),
     lease_mask(0), lease_mds(-1),
     dir_auth(-1), dir_hashed(false), dir_replicated(false), 
+    exporting_issued(0), exporting_mds(-1), exporting_mseq(0),
     reported_size(0), wanted_max_size(0), requested_max_size(0),
     ref(0), ll_ref(0), 
     dir(0), dn(0), symlink(0),
@@ -244,11 +239,11 @@ class Inode {
   bool put_cap_ref(int cap);
 
   int caps_issued() {
-    int c = 0;
+    int c = exporting_issued;
     for (map<int,InodeCap>::iterator it = caps.begin();
          it != caps.end();
          it++)
-      c |= it->second.issued | it->second.issued_exporting;
+      c |= it->second.issued;
     return c;
   }
 
index 5e62c9b01d3690c8c21f10729a329de1a9098a24..88ec35d5d5f7cb394858937fc848a6787041fefd 100644 (file)
@@ -598,6 +598,7 @@ struct ceph_mds_reply_head {
        __le32 result;
        __le32 file_caps;
        __le32 file_caps_seq;
+       __le32 file_caps_mseq;
        __le32 mdsmap_epoch;
 } __attribute__ ((packed));
 
@@ -716,7 +717,7 @@ struct ceph_mds_file_caps {
        __le32 caps, wanted;
        __le64 ino;
        __le64 size, max_size;
-       __le32 migrate_mds, migrate_seq;
+       __le32 migrate_seq;
        struct ceph_timespec mtime, atime, ctime;
        __le64 time_warp_seq;
 } __attribute__ ((packed));
index ea17da9adf435ff84c8beb3fff01096bbf8d12ca..6fb52ca2bee228776274b4abe1f0c80f3630b86e 100644 (file)
@@ -54,20 +54,20 @@ public:
     int32_t wanted;
     int32_t issued;
     int32_t pending;
-    capseq_t seq;
+    capseq_t mseq;
     Export() {}
-    Export(int w, int i, int p, capseq_t s) : wanted(w), issued(i), pending(p), seq(s) {}
+    Export(int w, int i, int p, capseq_t s) : wanted(w), issued(i), pending(p), mseq(s) {}
     void encode(bufferlist &bl) const {
       ::encode(wanted, bl);
       ::encode(issued, bl);
       ::encode(pending, bl);
-      ::encode(seq, bl);
+      ::encode(mseq, bl);
     }
     void decode(bufferlist::iterator &p) {
       ::decode(wanted, p);
       ::decode(issued, p);
       ::decode(pending, p);
-      ::decode(seq, p);
+      ::decode(mseq, p);
     }
   };
 
@@ -78,6 +78,7 @@ private:
   map<capseq_t, __u32>  cap_history;  // seq -> cap, [last_recv,last_sent]
   capseq_t last_sent, last_recv;
   capseq_t last_open;
+  capseq_t mseq;
   
   bool suppress;
   bool stale;
@@ -90,10 +91,13 @@ public:
     last_sent(s),
     last_recv(s),
     last_open(0),
+    mseq(0),
     suppress(false), stale(false),
     session_caps_item(this) { 
   }
   
+  capseq_t get_mseq() { return mseq; }
+
   capseq_t get_last_open() { return last_open; }
   void set_last_open() { last_open = last_sent; }
 
@@ -177,7 +181,7 @@ public:
   capseq_t get_last_seq() { return last_sent; }
 
   Export make_export() {
-    return Export(wanted_caps, issued(), pending(), last_sent);
+    return Export(wanted_caps, issued(), pending(), mseq+1);
   }
   void merge(Export& other) {
     // issued + pending
@@ -188,7 +192,7 @@ public:
 
     // wanted
     wanted_caps = wanted_caps | other.wanted;
-    last_sent = MAX(last_sent, other.seq);
+    mseq = other.mseq;
   }
   void merge(int otherwanted, int otherissued) {
     // issued + pending
index ac7b3af338b0a330c205227e8b239667ce70a19a..d7f98b5ff9253610137ab56f499fbca779422dce 100644 (file)
@@ -626,7 +626,8 @@ bool Locker::issue_caps(CInode *in)
                                                     in->inode,
                                                     cap->get_last_seq(),
                                                     cap->pending(),
-                                                    cap->wanted()),
+                                                    cap->wanted(),
+                                                    cap->get_mseq()),
                                 it->first);
       }
     }
@@ -647,7 +648,8 @@ void Locker::issue_truncate(CInode *in)
                                                 in->inode,
                                                 cap->get_last_seq(),
                                                 cap->pending(),
-                                                cap->wanted()),
+                                                cap->wanted(),
+                                                cap->get_mseq()),
                             it->first);
   }
 
@@ -905,7 +907,8 @@ void Locker::share_inode_max_size(CInode *in)
                                                   in->inode,
                                                   cap->get_last_seq(),
                                                   cap->pending(),
-                                                  cap->wanted()),
+                                                  cap->wanted(),
+                                                  cap->get_mseq()),
                               client);
     }
   }
index 9a88a8f527ea148bb5007b3fc6fe179369ec4651..de5ff2958fb76b4790bb4b9b758b6e96ee04ea1a 100644 (file)
@@ -2786,13 +2786,13 @@ void MDCache::rejoin_import_cap(CInode *in, int client, inode_caps_reconnect_t&
   Capability *cap = in->reconnect_cap(client, icr);
   session->touch_cap(cap);
   
-  // send REAP
+  // send IMPORT
   MClientFileCaps *reap = new MClientFileCaps(CEPH_CAP_OP_IMPORT,
                                              in->inode,
                                              cap->get_last_seq(),
                                              cap->pending(),
-                                             cap->wanted());
-  reap->set_migrate_mds(frommds); // reap from whom?
+                                             cap->wanted(),
+                                             cap->get_mseq());
   mds->messenger->send_message(reap, session->inst);
 }
 
index 9776a8c8a8640db58078e5187dd7becf29fa6794..da66547bc23e2ee5738b325a1951bcff6ff39870 100644 (file)
@@ -900,7 +900,8 @@ void Migrator::finish_export_inode_caps(CInode *in)
                                             in->inode, 
                                              cap->get_last_seq(), 
                                              cap->pending(),
-                                             cap->wanted());
+                                             cap->wanted(),
+                                            cap->get_mseq());
     mds->send_message_client(m, it->first);
   }
   in->clear_client_caps();
@@ -2055,7 +2056,7 @@ void Migrator::finish_import_inode_caps(CInode *in, int from,
                                                cap->get_last_seq(),
                                                cap->pending(),
                                                cap->wanted(),
-                                               from);
+                                               cap->get_mseq());
     mds->send_message_client(caps, session->inst);
   }
 
index c1820c41cbbb4611612ace1f2ec5c897fa608c6f..ae6267c1f684cf43c80c2fe0497ed4f3e40c58ec 100644 (file)
@@ -401,7 +401,8 @@ void Server::handle_client_reconnect(MClientReconnect *m)
                                                     fake_inode, 
                                                     0,
                                                     0,                // doesn't matter.
-                                                    p->second.wanted); // doesn't matter.
+                                                    p->second.wanted, // doesn't matter.
+                                                    0);  // FIXME get proper mseq here?  hmm.
        mds->send_message_client(stale, m->get_source_inst());
 
        // add to cap export list.
@@ -4391,6 +4392,7 @@ void Server::_do_open(MDRequest *mdr, CInode *cur)
   MClientReply *reply = new MClientReply(req, 0);
   reply->set_file_caps(cap->pending());
   reply->set_file_caps_seq(cap->get_last_seq());
+  reply->set_file_caps_mseq(cap->get_mseq());
   //reply->set_file_data_version(fdv);
   reply_request(mdr, reply);
 
index 4a28576e6c20cdb70d3e6f9023c6b09e42666385..8c00e9d47ceb681e248a37a23ffe1f9e02f2aa0c 100644 (file)
@@ -39,6 +39,7 @@ class MClientFileCaps : public Message {
   int       get_caps() { return h.caps; }
   int       get_wanted() { return h.wanted; }
   capseq_t  get_seq() { return h.seq; }
+  capseq_t  get_mseq() { return h.migrate_seq; }
 
   inodeno_t get_ino() { return inodeno_t(h.ino); }
   __u64 get_size() { return h.size;  }
@@ -48,8 +49,6 @@ class MClientFileCaps : public Message {
   utime_t get_atime() { return utime_t(h.atime); }
   __u64 get_time_warp_seq() { return h.time_warp_seq; }
 
-  // for cap migration
-  int       get_migrate_mds() { return h.migrate_mds; }
   int       get_migrate_seq() { return h.migrate_seq; }
   int       get_op() { return h.op; }
 
@@ -58,7 +57,6 @@ class MClientFileCaps : public Message {
 
   void set_max_size(__u64 ms) { h.max_size = ms; }
 
-  void set_migrate_mds(int m) { h.migrate_mds = m; }
   void set_migrate_seq(int m) { h.migrate_seq = m; }
   void set_op(int o) { h.op = o; }
 
@@ -72,8 +70,7 @@ class MClientFileCaps : public Message {
                   long seq,
                   int caps,
                   int wanted,
-                  int mmds=0,
-                 int mseq=0) :
+                 int mseq) :
     Message(CEPH_MSG_CLIENT_FILECAPS) {
     h.op = op;
     h.seq = seq;
@@ -82,7 +79,6 @@ class MClientFileCaps : public Message {
     h.ino = inode.ino;
     h.size = inode.size;
     h.max_size = inode.max_size;
-    h.migrate_mds = mmds;
     h.migrate_seq = mseq;
     inode.mtime.encode_timeval(&h.mtime);
     inode.atime.encode_timeval(&h.atime);
@@ -99,8 +95,10 @@ class MClientFileCaps : public Message {
        << " wanted" << cap_string(h.wanted)
        << " size " << h.size << "/" << h.max_size
        << " mtime " << utime_t(h.mtime)
-       << " tws " << h.time_warp_seq
-       << ")";
+       << " tws " << h.time_warp_seq;
+    if (h.migrate_seq)
+      out << " mseq " << h.migrate_seq;
+    out << ")";
   }
   
   void decode_payload() {
index 7604a219ec4412b78b196dda253eee27d258f0ff..cd70d803965c0c71601f5c3c90ede379d271201a 100644 (file)
@@ -228,11 +228,13 @@ class MClientReply : public Message {
 
   unsigned get_file_caps() { return st.file_caps; }
   unsigned get_file_caps_seq() { return st.file_caps_seq; }
+  unsigned get_file_caps_mseq() { return st.file_caps_mseq; }
   //uint64_t get_file_data_version() { return st.file_data_version; }
   
   void set_result(int r) { st.result = r; }
   void set_file_caps(unsigned char c) { st.file_caps = c; }
-  void set_file_caps_seq(long s) { st.file_caps_seq = s; }
+  void set_file_caps_seq(capseq_t s) { st.file_caps_seq = s; }
+  void set_file_caps_mseq(capseq_t s) { st.file_caps_mseq = s; }
   //void set_file_data_version(uint64_t v) { st.file_data_version = v; }
 
   MClientReply() {}
index 73f7b03b845f9f049c8bbfe53400f146928c60e7..e3d0b3db19e8f0bc86b5734a351f76a956158c94 100755 (executable)
@@ -48,8 +48,8 @@ do
 done
 
 # mds
-$CEPH_BIN/cmds $ARGS --debug_ms 1 --debug_mds 20 --mds_thrash_fragments 0 --mds_thrash_exports 0 #--debug_ms 20
-$CEPH_BIN/cmds $ARGS --debug_ms 1 --debug_mds 20 --mds_thrash_fragments 0 --mds_thrash_exports 0 #--debug_ms 20
+$CEPH_BIN/cmds $ARGS --debug_ms 1 --debug_mds 20 --mds_thrash_fragments 0 --mds_thrash_exports 10 #--debug_ms 20
+$CEPH_BIN/cmds $ARGS --debug_ms 1 --debug_mds 20 --mds_thrash_fragments 0 --mds_thrash_exports 10 #--debug_ms 20
 ./cmonctl mds set_max_mds 2
 
 echo "started.  stop.sh to stop.  see out/* (e.g. 'tail -f out/????') for debug output."