]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: fixed up cap import/exports. still need to deal with mds failures.
authorSage Weil <sage@newdream.net>
Wed, 4 Jun 2008 23:15:32 +0000 (16:15 -0700)
committerSage Weil <sage@newdream.net>
Thu, 5 Jun 2008 16:58:09 +0000 (09:58 -0700)
src/client/Client.cc
src/client/Client.h
src/client/SyntheticClient.cc
src/mds/Capability.h

index 2e224c06024c4ab8fff1bba0214c6ecaa4ed669a..5789e9f55798f3ae276e12a75799636cc18166d5 100644 (file)
@@ -1198,10 +1198,12 @@ void Client::send_reconnect(int mds)
        dout(10) << " path on " << p->first << " is " << path << dendl;
        m->add_inode_path(p->first, path.get_path());
       }
+      /*
       if (p->second->stale_caps.count(mds)) {
        dout(10) << " clearing stale caps on " << p->first << dendl;
        p->second->stale_caps.erase(mds);         // hrm, is this right?
       }
+      */
     }
 
     // reset my cap seq number
@@ -1359,9 +1361,16 @@ void Client::check_caps(Inode *in)
           << " used " << cap_string(used)
           << dendl;
   
+  if (in->caps.empty())
+    return;   // guard if at end of func
+  
+  map<int,InodeCap>::iterator next;
   for (map<int,InodeCap>::iterator it = in->caps.begin();
        it != in->caps.end();
-       it++) {
+       it = next) {
+    next = it;
+    next++;
+
     InodeCap &cap = it->second;
     int revoking = cap.implemented & ~cap.issued;
     
@@ -1404,12 +1413,16 @@ void Client::check_caps(Inode *in)
     m->set_max_size(in->wanted_max_size);
     in->requested_max_size = in->wanted_max_size;
     messenger->send_message(m, mdsmap->get_inst(it->first));
-    if (wanted == 0)
-      mds_sessions[it->first].num_caps--;
+    if (wanted == 0) {
+      it->second.seq = 0;
+      if (it->second.can_drop()) {
+       mds_sessions[it->first].num_caps--;
+       in->caps.erase(it);
+      }
+    }
   }
 
-  if (wanted == 0 && !in->caps.empty()) {
-    in->caps.clear();
+  if (in->caps.empty()) {
     dout(10) << "last caps on " << *in << dendl;
     put_inode(in);
   }
@@ -1521,68 +1534,69 @@ void Client::handle_file_caps(MClientFileCaps *m)
   if (m->get_op() == CEPH_CAP_OP_IMPORT) {
     int other = m->get_migrate_mds();
 
-    /*
-     * FIXME: there is a race here.. if the caps are exported twice in succession,
-     *  you may get the second import before the first, in which case the middle MDS's
-     *  import and then export won't be handled properly.
-     *  there should be a sequence number attached to the cap, incremented each time
-     *  it is exported... 
-     */
     /*
      * FIXME: handle mds failures
      */
 
-    if (in && in->stale_caps.count(other)) {
-      dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " imported from mds" << other << dendl;
+    // fresh from new mds?
+    if (!in->caps.count(mds)) {
+      mds_sessions[mds].num_caps++;
+      if (in->caps.empty()) in->get();
+      in->caps[mds].seq = m->get_seq();
+      in->caps[mds].issued = m->get_caps();
+    }
 
-      // fresh from new mds?
-      if (!in->caps.count(mds)) {
-       mds_sessions[mds].num_caps++;
-        if (in->caps.empty()) in->get();
-        in->caps[mds].seq = m->get_seq();
-        in->caps[mds].issued = m->get_caps();
+    if (in->caps.count(other) &&
+       in->caps[other].issued_exporting) {
+      dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " IMPORT to mds" << mds << " from mds" << other
+             << " - already saw EXPORT, done" << dendl;
+      in->caps[other].issued_exporting = 0;
+      if (in->caps[other].can_drop()) {
+       mds_sessions[other].num_caps--;
+       in->caps.erase(other);
       }
-      
-      assert(in->stale_caps.count(other));
-      in->stale_caps.erase(other);
-      if (in->stale_caps.empty()) put_inode(in); // note: this will never delete *in
-      
-      // fall-thru!
     } else {
-      dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " premature (!!) import from mds" << other << dendl;
-      // delay!
-      cap_reap_queue[in->ino()][other] = m;
-      return;
+      dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " IMPORT to mds" << mds << " from mds" << other
+             << " - no EXPORT yet, marking" << dendl;
+      in->caps[mds].importing_from.insert(other);
     }
+      
+    // fall-thru!
+    // (wake ppl up, etc.)
   }
 
   // stale?
   if (m->get_op() == CEPH_CAP_OP_EXPORT) {
-    dout(5) << "handle_file_caps on ino " << m->get_ino() << " seq " << m->get_seq() << " from mds" << mds << " now exported/stale" << dendl;
-    
     // move to stale list
-    assert(in->caps.count(mds));
-    if (in->stale_caps.empty()) in->get();
-    in->stale_caps[mds] = in->caps[mds];
-    in->stale_caps[mds].seq = m->get_seq();
-
-    assert(in->caps.count(mds));
-    in->caps.erase(mds);
-    mds_sessions[mds].num_caps--;
-    if (in->caps.empty()) in->put();
-
-    // delayed reap?
-    if (cap_reap_queue.count(in->ino()) &&
-        cap_reap_queue[in->ino()].count(mds)) {
-      dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds
-             << " delayed reap on mds?FIXME?" << dendl;  /* FIXME */
-      
-      // process delayed reap
-      handle_file_caps( cap_reap_queue[in->ino()][mds] );
-
-      cap_reap_queue[in->ino()].erase(mds);
-      if (cap_reap_queue[in->ino()].empty())
-        cap_reap_queue.erase(in->ino());
+    bool found_importing = false;
+    for (map<int,InodeCap>::iterator p = in->caps.begin();
+        p != in->caps.end();
+        p++) {
+      if (p->first == mds) continue;
+      if (p->second.importing_from.count(mds) == 0) continue;
+
+      dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " EXPORT from mds" << mds
+             << " - already saw IMPORT, done" << dendl;
+      p->second.importing_from.erase(mds);
+      mds_sessions[mds].num_caps--;
+      in->caps.erase(mds);
+      found_importing = true;
+      if (p->second.can_drop()) {
+       mds_sessions[p->first].num_caps--;
+       in->caps.erase(p);
+       if (in->caps.empty())
+         put_inode(in);
+      }
+      break;
+    } 
+    if (!found_importing) {
+      dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " EXPORT from mds" << mds
+             << " - no IMPORT yet, marking" << dendl;
+      if (in->caps.empty()) in->get();
+      InodeCap &cap = in->caps[mds];
+      cap.issued_exporting = in->caps[mds].issued;
+      cap.issued = 0;
+      cap.seq = 0;
     }
     delete m;
     return;
@@ -1623,10 +1637,12 @@ void Client::handle_file_caps(MClientFileCaps *m)
     return;
   }
   
-  InodeCap &cap = in->caps[mds];
 
+  // ok!
+  InodeCap &cap = in->caps[mds];
+  cap.seq = m->get_seq();
 
-  // don't want?
+  // don't want it?
   int wanted = in->caps_wanted();
   if (wanted == 0) {
     dout(5) << "handle_file_caps on ino " << m->get_ino() 
@@ -1637,6 +1653,13 @@ void Client::handle_file_caps(MClientFileCaps *m)
     m->set_caps(0);
     m->set_wanted(0);
     messenger->send_message(m, m->get_source_inst());
+    cap.seq = 0;
+    if (cap.can_drop()) {
+      mds_sessions[mds].num_caps--;
+      in->caps.erase(mds);
+      if (in->caps.empty())
+       put_inode(in);
+    }      
     return;
   }
 
@@ -1666,7 +1689,6 @@ void Client::handle_file_caps(MClientFileCaps *m)
   }
 
   // update caps
-  cap.seq = m->get_seq();
 
   bool ack = false;
   
index 0653a34ba607d92bd2d98a020be9b145dad7c427..d48e4ad01a79f8e28e4a663fef6e129d5f820de2 100644 (file)
@@ -130,7 +130,18 @@ class InodeCap {
   unsigned issued;
   unsigned implemented;
   unsigned seq;
-  InodeCap() : issued(0), implemented(0), seq(0) {}
+
+  int issued_exporting;        // if export comes first
+  set<int> importing_from;    // if import comes first
+
+  InodeCap() : issued(0), implemented(0), seq(0),
+              issued_exporting(0) {}
+  
+  bool can_drop() {
+    return issued == 0 &&
+      issued_exporting == 0 &&
+      importing_from.empty();
+  }
 };
 
 
@@ -147,7 +158,6 @@ class Inode {
 
   // per-mds caps
   map<int,InodeCap> caps;            // mds -> InodeCap
-  map<int,InodeCap> stale_caps;      // mds -> cap .. stale
 
   //int open_by_mode[CEPH_FILE_MODE_NUM];
   map<int,int> open_by_mode;
@@ -238,11 +248,7 @@ class Inode {
     for (map<int,InodeCap>::iterator it = caps.begin();
          it != caps.end();
          it++)
-      c |= it->second.issued;
-    for (map<int,InodeCap>::iterator it = stale_caps.begin();
-         it != stale_caps.end();
-         it++)
-      c |= it->second.issued;
+      c |= it->second.issued | it->second.issued_exporting;
     return c;
   }
 
@@ -560,10 +566,6 @@ protected:
   Inode*                 root;
   LRU                    lru;    // lru list of Dentry's in our local metadata cache.
 
-  // cap weirdness
-  map<inodeno_t, map<int, class MClientFileCaps*> > cap_reap_queue;  // ino -> mds -> msg .. set of (would-be) stale caps to reap
-
-
   // file handles, etc.
   filepath cwd;
   interval_set<int> free_fd_set;  // unused fds
@@ -604,7 +606,7 @@ protected:
     //cout << "put_inode on " << in << " " << in->inode.ino << endl;
     in->put(n);
     if (in->ref == 0) {
-      //cout << "put_inode deleting " << in->inode.ino << endl;
+      //cout << "put_inode deleting " << in << " " << in->inode.ino << std::endl;
       inode_map.erase(in->inode.ino);
       if (in == root) root = 0;
       delete in;
index 91a29aceb8db08ef6152cfc6663d5a42fc1d6416..b1c7343a73d0206807c4a63cbbfa96c3159b3de0 100644 (file)
@@ -1700,8 +1700,9 @@ int SyntheticClient::create_shared(int num)
 {
   // files
   char d[255];
+  client->mkdir("test", 0755);
   for (int n=0; n<num; n++) {
-    sprintf(d,"file.%d", n);
+    sprintf(d,"test/file.%d", n);
     client->mknod(d, 0644);
   }
   
@@ -1716,14 +1717,14 @@ int SyntheticClient::open_shared(int num, int count)
     // open
     list<int> fds;
     for (int n=0; n<num; n++) {
-      sprintf(d,"file.%d", n);
+      sprintf(d,"test/file.%d", n);
       int fd = client->open(d,O_RDONLY);
       if (fd > 0) fds.push_back(fd);
     }
 
     if (false && client->get_nodeid() == 0)
       for (int n=0; n<num; n++) {
-       sprintf(d,"file.%d", n);
+       sprintf(d,"test/file.%d", n);
        client->unlink(d);
       }
 
index deb6be5a1778e63f39eb8681290bad54744bc2d4..ea17da9adf435ff84c8beb3fff01096bbf8d12ca 100644 (file)
@@ -54,17 +54,20 @@ public:
     int32_t wanted;
     int32_t issued;
     int32_t pending;
+    capseq_t seq;
     Export() {}
-    Export(int w, int i, int p) : wanted(w), issued(i), pending(p) {}
+    Export(int w, int i, int p, capseq_t s) : wanted(w), issued(i), pending(p), seq(s) {}
     void encode(bufferlist &bl) const {
       ::encode(wanted, bl);
       ::encode(issued, bl);
       ::encode(pending, bl);
+      ::encode(seq, bl);
     }
     void decode(bufferlist::iterator &p) {
       ::decode(wanted, p);
       ::decode(issued, p);
       ::decode(pending, p);
+      ::decode(seq, p);
     }
   };
 
@@ -174,7 +177,7 @@ public:
   capseq_t get_last_seq() { return last_sent; }
 
   Export make_export() {
-    return Export(wanted_caps, issued(), pending());
+    return Export(wanted_caps, issued(), pending(), last_sent);
   }
   void merge(Export& other) {
     // issued + pending
@@ -185,6 +188,7 @@ public:
 
     // wanted
     wanted_caps = wanted_caps | other.wanted;
+    last_sent = MAX(last_sent, other.seq);
   }
   void merge(int otherwanted, int otherissued) {
     // issued + pending