]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
multiset pins (with counters); some fixes with migration (mostly dirfrag_t fallout...
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 9 Mar 2007 21:10:34 +0000 (21:10 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 9 Mar 2007 21:10:34 +0000 (21:10 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1192 29311d96-e01e-0410-9327-a35deaab8ce9

20 files changed:
branches/sage/cephmds2/TODO
branches/sage/cephmds2/include/Context.h
branches/sage/cephmds2/mds/CDentry.cc
branches/sage/cephmds2/mds/CDentry.h
branches/sage/cephmds2/mds/CDir.cc
branches/sage/cephmds2/mds/CDir.h
branches/sage/cephmds2/mds/CInode.cc
branches/sage/cephmds2/mds/CInode.h
branches/sage/cephmds2/mds/MDCache.cc
branches/sage/cephmds2/mds/MDLog.cc
branches/sage/cephmds2/mds/MDS.cc
branches/sage/cephmds2/mds/Migrator.cc
branches/sage/cephmds2/mds/Migrator.h
branches/sage/cephmds2/mds/Server.cc
branches/sage/cephmds2/mds/Server.h
branches/sage/cephmds2/mds/events/EMetaBlob.h
branches/sage/cephmds2/mds/journal.cc
branches/sage/cephmds2/mds/mdstypes.h
branches/sage/cephmds2/messages/MExportDir.h
branches/sage/cephmds2/messages/MExportDirPrep.h

index 79577772c1fdcba788feec601f2c6a4300bfb036..b06dcc1ac9c4d7db6b6317149f3528f19ff934dd 100644 (file)
@@ -26,6 +26,9 @@ mds
 - fix mds initial osdmap weirdness (which will currently screw up on standby -> almost anything)
 - incremental mdsmaps
 - client failure
+- dirfrag split
+  - make sure we are freezing _before_ we fetch to complete the dirfrag, else 
+    we break commit()'s preconditions when it fetches an incomplete dir.
 
 - EMetablob should return 'expired' if they have higher versions (and are thus described by a newer journal entry)
 - dir version/committed/etc versus migration, log expires.  
@@ -48,7 +51,7 @@ mds
   - will need to ditch 10s client metadata caching before this is useful
   - implement truncate
 - statfs?
-- btree directories (for efficient large directories)
+? btree directories (for efficient large directories)
 - consistency points/snapshots
 
 - fix MExportAck and others to use dir+dentry, not inode
index 78059b8d39d82ecbf2fd63d750f4b4549a4184fe..b2d1113dc22ffbdf35b2fd45ccbbb3d325bd208a 100644 (file)
@@ -56,6 +56,12 @@ inline void finish_contexts(std::list<Context*>& finished,
   }
 }
 
+class C_NoopContext : public Context {
+public:
+  void finish(int r) { }
+};
+
+
 /*
  * C_Contexts - set of Contexts
  */
index fcdea1bc90b0a3a42b013822fcdcc0e66fe2beef..0f0fce7df40a7b2c8787838394f54020d50a6ef4 100644 (file)
@@ -60,10 +60,7 @@ ostream& operator<<(ostream& out, CDentry& dn)
 
   if (dn.get_num_ref()) {
     out << " |";
-    for(set<int>::iterator it = dn.get_ref_set().begin();
-        it != dn.get_ref_set().end();
-        it++)
-      out << " " << CDentry::pin_name(*it);
+    dn.print_pin_set(out);
   }
 
   out << " " << &dn;
index ac6c5ffb8cfde2edbb96d9a77944362d2e0c692a..49ed6dc9cfb536e71a290b3364bf314a612f18b7 100644 (file)
@@ -52,7 +52,7 @@ class CDentry : public MDSCacheObject, public LRUObject {
   static const int PIN_DIRTY = 2;      //
   static const int PIN_PROXY = 3;      //
   static const int PIN_XLOCK = 4;
-  static const char *pin_name(int p) {
+  const char *pin_name(int p) {
     switch (p) {
     case PIN_INODEPIN: return "inodepin";
     case PIN_REPLICATED: return "replicated";
index da95e4f6c6799732d8575ea321c7a155c3b4a2f2..c01aee7b7c0a172378ca5ca772f82992c6e8d3a9 100644 (file)
@@ -77,10 +77,7 @@ ostream& operator<<(ostream& out, CDir& dir)
   
   if (dir.get_num_ref()) {
     out << " |";
-    for(set<int>::iterator it = dir.get_ref_set().begin();
-        it != dir.get_ref_set().end();
-        it++)
-      out << " " << CDir::pin_name(*it);
+    dir.print_pin_set(out);
   }
 
   out << " " << &dir;
@@ -111,8 +108,6 @@ CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth)
   committing_version = 0;
   committed_version = 0;
 
-  ref = 0;
-
   // dir_auth
   dir_auth = CDIR_AUTH_DEFAULT;
 
@@ -120,12 +115,6 @@ CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth)
   assert(in->is_dir());
   if (auth) 
     state |= STATE_AUTH;
-  /*
-  if (in->dir_is_hashed()) {
-    assert(0);                      // when does this happen?  
-    state |= STATE_HASHED;
-  }
-  */
  
   auth_pins = 0;
   nested_auth_pins = 0;
@@ -777,13 +766,46 @@ void CDir::_fetched(bufferlist &bl)
 // -----------------------
 // COMMIT
 
+/**
+ * commit
+ *
+ * @param want min version i want committed
+ * @param c callback for completion
+ */
+void CDir::commit(version_t want, Context *c)
+{
+  dout(10) << "commit want " << want << " on " << *this << endl;
+  if (want == 0) want = version;
+
+  // preconditions
+  assert(want <= version);          // can't commit the future
+  assert(committed_version < want); // the caller is stupid
+  assert(is_auth());
+  assert(can_auth_pin());
+
+  // note: queue up a noop if necessary, so that we always
+  // get an auth_pin.
+  if (!c)
+    c = new C_NoopContext;
+
+  // auth_pin on first waiter
+  if (waiting_for_commit.empty())
+    auth_pin();
+  waiting_for_commit[want].push_back(c);
+  
+  // ok.
+  _commit(want);
+}
+
+
 class C_Dir_RetryCommit : public Context {
   CDir *dir;
   version_t want;
 public:
-  C_Dir_RetryCommit(CDir *d, version_t v) : dir(d), want(v) { }
+  C_Dir_RetryCommit(CDir *d, version_t v) : 
+    dir(d), want(v) { }
   void finish(int r) {
-    dir->commit(want, 0);
+    dir->_commit(want);
   }
 };
 
@@ -797,48 +819,26 @@ public:
   }
 };
 
-/**
- * commit
- *
- * @param want min version i want committed
- * @param c callback for completion
- */
-void CDir::commit(version_t want, Context *c)
+void CDir::_commit(version_t want)
 {
-  dout(10) << "commit want " << want << " on " << *this << endl;
-  if (want == 0) want = version;
-  
-  if (c) {
-    assert(committed_version < want);
-    waiting_for_commit[want].push_back(c);
-  }
+  dout(10) << "_commit want " << want << " on " << *this << endl;
+
+  // we can't commit things in the future.
+  // (even the projected future.)
+  assert(want <= version);
+
+  // check pre+postconditions.
+  assert(is_auth());
 
-  // not auth?
-  if (!is_auth()) {
-    dout(10) << "not auth.  must have exported.  kicking all waiters." << endl;
-    for (map<version_t, list<Context*> >::iterator p = waiting_for_commit.begin();
-        p != waiting_for_commit.end();
-        ++p) 
-      cache->mds->queue_finished(p->second);
-    waiting_for_commit.clear();
-    return;
-  }
   // already committed?
   if (committed_version >= want) {
     dout(10) << "already committed " << committed_version << " >= " << want << endl;
     return;
   }
+  // already committing >= want?
   if (committing_version >= want) {
     dout(10) << "already committing " << committing_version << " >= " << want << endl;
-    return;
-  }
-
-  // authpinnable?
-  if (!can_auth_pin()) {
-    dout(7) << "can't auth_pin, waiting" << endl;
-    add_waiter(WAIT_AUTHPINNABLE,
-              new C_Dir_RetryCommit(this, want));
+    assert(state_test(STATE_COMMITTING));
     return;
   }
   
@@ -849,12 +849,14 @@ void CDir::commit(version_t want, Context *c)
     return;
   }
   
-  // pin.
-  auth_pin();
-
   // commit.
-  state_set(CDir::STATE_COMMITTING);
   committing_version = version;
+
+  // mark committing (if not already)
+  if (!state_test(STATE_COMMITTING)) {
+    dout(10) << "marking committing" << endl;
+    state_set(STATE_COMMITTING);
+  }
   
   if (cache->mds->logger) cache->mds->logger->inc("cdir");
 
@@ -933,13 +935,14 @@ void CDir::_committed(version_t v)
   assert(v <= committing_version);
   committed_version = v;
 
+  // _all_ commits done?
+  if (committing_version == committed_version) 
+    state_clear(CDir::STATE_COMMITTING);
+  
   // dir clean?
   if (committed_version == version) 
     mark_clean();
 
-  if (committing_version == committed_version) 
-    state_clear(CDir::STATE_COMMITTING);
-
   // dentries clean?
   for (CDir_map_t::iterator it = items.begin();
        it != items.end(); ) {
@@ -974,19 +977,23 @@ void CDir::_committed(version_t v)
     }
   }
 
-  // unpin
-  auth_unpin();
-
   // finishers?
+  bool were_waiters = !waiting_for_commit.empty();
+  
   map<version_t, list<Context*> >::iterator p = waiting_for_commit.begin();
   while (p != waiting_for_commit.end()) {
     map<version_t, list<Context*> >::iterator n = p;
     n++;
-    if (p->first > committed_version) break; // haven't commit this far yet.
+    if (p->first > committed_version) break; // haven't committed this far yet.
     cache->mds->queue_finished(p->second);
     waiting_for_commit.erase(p);
     p = n;
   } 
+
+  // unpin if we kicked the last waiter.
+  if (were_waiters &&
+      waiting_for_commit.empty())
+    auth_unpin();
 }
 
 
index 467eea5b005de5955514eead9d8e75801bd790da..d6065c0315691f3f24b090a7e90ed1ccd76c0507 100644 (file)
@@ -84,7 +84,7 @@ class CDir : public MDSCacheObject {
   static const int PIN_DIRTY =    15;
   static const int PIN_REQUEST =  16;
   static const int PIN_LOGGINGEXPORTFINISH = 17;
-  static const char *pin_name(int p) {
+  const char *pin_name(int p) {
     switch (p) {
     case PIN_CHILD: return "child";
     case PIN_OPENED: return "opened";
@@ -192,11 +192,8 @@ class CDir : public MDSCacheObject {
   // context
   MDCache  *cache;
 
-  // my inode
-  CInode          *inode;
-
-  // my frag
-  frag_t           frag;
+  CInode          *inode;  // my inode
+  frag_t           frag;   // my frag
 
  protected:
   // contents
@@ -207,7 +204,7 @@ class CDir : public MDSCacheObject {
   // state
   version_t       version;
   version_t       committing_version;
-  version_t       committed_version;   // slight lie; we bump this on import.
+  version_t       committed_version;
   version_t       projected_version; 
 
   // lock nesting, freeze
@@ -215,14 +212,6 @@ class CDir : public MDSCacheObject {
   int        nested_auth_pins;
   int        request_pins;
 
-  // hashed dirs
-  set<int>   hashed_subset;  // HASHING: subset of mds's that are hashed
- public:
-  // for class MDS
-  map<int, pair< list<class InodeStat*>, list<string> > > hashed_readdir;
- protected:
-
-
 
   // waiters
   multimap<int, Context*> waiting;  // tag -> context
@@ -252,7 +241,7 @@ class CDir : public MDSCacheObject {
 
 
   // -- accessors --
-  inodeno_t ino()     { return inode->ino(); }
+  inodeno_t ino()     { return inode->ino(); }          // deprecate me?
   dirfrag_t dirfrag() { return dirfrag_t(inode->ino(), frag); }
 
   CInode *get_inode()    { return inode; }
@@ -375,7 +364,8 @@ class CDir : public MDSCacheObject {
   map<version_t, list<Context*> > waiting_for_commit;
 
   void commit_to(version_t want);
-  void commit(version_t want,Context *c);
+  void commit(version_t want, Context *c);
+  void _commit(version_t want);
   void _committed(version_t v);
   void wait_for_commit(Context *c, version_t v=0);
 
@@ -432,7 +422,7 @@ class CDir : public MDSCacheObject {
 
 
   // -- auth pins --
-  bool can_auth_pin() { return !(is_frozen() || is_freezing()); }
+  bool can_auth_pin() { return is_auth() && !(is_frozen() || is_freezing()); }
   int is_auth_pinned() { return auth_pins; }
   int get_cum_auth_pins() { return auth_pins + nested_auth_pins; }
   int get_auth_pins() { return auth_pins; }
@@ -559,6 +549,7 @@ class CDirExport {
     long        nitems; // actual real entries
     long        nden;   // num dentries (including null ones)
     version_t   version;
+    version_t   committed_version;
     unsigned    state;
     meta_load_t popularity_justme;
     meta_load_t popularity_curdom;
@@ -578,6 +569,7 @@ class CDirExport {
     st.nitems = dir->nitems;
     st.nden = dir->items.size();
     st.version = dir->version;
+    st.committed_version = dir->committed_version;
     st.state = dir->state;
     st.dir_rep = dir->dir_rep;
 
@@ -599,8 +591,8 @@ class CDirExport {
     //dir->nitems = st.nitems;
 
     // set committed_version at old version
-    dir->committing_version = dir->committed_version = st.version;
-    dir->projected_version = dir->version = st.version;    // this is bumped, below, if dirty
+    dir->committing_version = dir->committed_version = st.committed_version;
+    dir->projected_version = dir->version = st.version;
 
     // twiddle state
     if (dir->state & CDir::STATE_HASHED) 
@@ -627,9 +619,6 @@ class CDirExport {
       dir->get(CDir::PIN_OPENED);
     if (dir->is_dirty()) {
       dir->get(CDir::PIN_DIRTY);  
-
-      // bump dir version + 1 if dirty
-      dir->projected_version = dir->version = st.version + 1;
     }
   }
 
index c1951cd6d972f52ddb13ccd84ccc56a880dd12de..e1cbe25bacdff5dd48b2b8d2cc00c31f4cf16488 100644 (file)
@@ -57,10 +57,7 @@ ostream& operator<<(ostream& out, CInode& in)
 
   if (in.get_num_ref()) {
     out << " |";
-    for(set<int>::iterator it = in.get_ref_set().begin();
-        it != in.get_ref_set().end();
-        it++)
-      out << " " << CInode::pin_name(*it);
+    in.print_pin_set(out);
   }
 
   // hack: spit out crap on which clients have caps
@@ -81,11 +78,10 @@ ostream& operator<<(ostream& out, CInode& in)
 
 
 // ====== CInode =======
-CInode::CInode(MDCache *c, bool auth) {
+CInode::CInode(MDCache *c, bool auth) 
+{
   mdcache = c;
 
-  ref = 0;
-  
   //num_parents = 0;
   parent = NULL;
   
@@ -93,7 +89,7 @@ CInode::CInode(MDCache *c, bool auth) {
 
   auth_pins = 0;
   nested_auth_pins = 0;
-  num_request_pins = 0;
+  //num_request_pins = 0;
 
   state = 0;  
 
@@ -242,8 +238,10 @@ CDir *CInode::get_or_open_dirfrag(MDCache *mdcache, frag_t fg)
 {
   assert(is_dir());
   if (1) { // old
-    if (!dir)
+    if (!dir) {
+      assert(is_auth());
       dir = new CDir(this, fg, mdcache, true);
+    }
     return dir;
   } else { // new
     // have it?
index 269e25b40338475e87d3b3fad0d3a48ef80c62b3..0a289af22d659ce44d3f4c3083aef3814eac5f97 100644 (file)
@@ -59,15 +59,15 @@ class CInode : public MDSCacheObject {
   static const int PIN_WAITER =     6;  // waiter
   static const int PIN_CAPS =       7;  // local fh's
   static const int PIN_AUTHPIN =    8;
-  static const int PIN_IMPORTING =  9;  // multipurpose, for importing
-  static const int PIN_REQUEST =   10;  // request is logging, finishing
+  static const int PIN_IMPORTING =  -9;  // importing
+  static const int PIN_REQUEST =   -10;  // request is logging, finishing
   static const int PIN_RENAMESRC = 11;  // pinned on dest for foreign rename
   static const int PIN_ANCHORING = 12;
   static const int PIN_OPENINGDIR = 13;
   static const int PIN_REMOTEPARENT = 14;
   static const int PIN_DENTRYLOCK = 15;
 
-  static const char *pin_name(int p) {
+  const char *pin_name(int p) {
     switch (p) {
     case PIN_CACHED: return "cached";
     case PIN_DIR: return "dir";
@@ -177,7 +177,7 @@ class CInode : public MDSCacheObject {
   // distributed caching (old)
   pair<int,int> dangling_auth;    // explicit auth, when dangling.
 
-  int           num_request_pins;
+  //int           num_request_pins;
 
   // waiters
   multimap<int, Context*>  waiting;
@@ -431,13 +431,15 @@ protected:
      linked to an active_request, so they're automatically cleaned
      up when a request is finished.  pin at will! */
   void request_pin_get() {
-    if (num_request_pins == 0) get(PIN_REQUEST);
-    num_request_pins++;
+    //if (num_request_pins == 0) 
+    get(PIN_REQUEST);
+    //num_request_pins++;
   }
   void request_pin_put() {
-    num_request_pins--;
-    if (num_request_pins == 0) put(PIN_REQUEST);
-    assert(num_request_pins >= 0);
+    //num_request_pins--;
+    //if (num_request_pins == 0) 
+    put(PIN_REQUEST);
+    //assert(num_request_pins >= 0);
   }
 
   void bad_put(int by) {
index 3046524765cd62f9911b3332ca4c67b6a0d8d387..6c23a634957913fd78b06e150cf4d74793f46d4b 100644 (file)
@@ -417,7 +417,9 @@ void MDCache::try_subtree_merge_at(CDir *dir)
 void MDCache::adjust_bounded_subtree_auth(CDir *dir, set<CDir*>& bounds, pair<int,int> auth)
 {
   dout(7) << "adjust_bounded_subtree_auth " << dir->get_dir_auth() << " -> " << auth
-         << " on " << *dir << endl;
+         << " on " << *dir
+         << " bounds " << bounds
+         << endl;
 
   show_subtrees();
 
@@ -3409,7 +3411,8 @@ void MDCache::handle_discover_reply(MDiscoverReply *m)
         m->get_dir(i).update_dir(ndir);
 
        // is this a dir_auth delegation boundary?
-       if (m->get_source().num() != cur->authority().first)
+       if (m->get_source().num() != cur->authority().first ||
+           cur->ino() == 1)
          adjust_subtree_auth(ndir, m->get_source().num());
        
         dout(7) << "added " << *ndir << " nonce " << ndir->replica_nonce << endl;
index b6de2a4c5da2e56d52aea323bfdeb381cd90fcf5..f030d8b76c286481ba3b7b0cfe4b4a470962744b 100644 (file)
@@ -238,8 +238,14 @@ void MDLog::_did_read()
 
 void MDLog::_trimmed(LogEvent *le) 
 {
+  // successful trim?
+  if (!le->has_expired(mds)) {
+    dout(7) << "retrimming : " << le->get_start_off() << " : " << *le << endl;
+    le->expire(mds, new C_MDL_Trimmed(this, le));
+    return;
+  }
+
   dout(7) << "trimmed : " << le->get_start_off() << " : " << *le << endl;
-  assert(le->has_expired(mds));
 
   if (trimming.begin()->first == le->_end_off) {
     // we trimmed off the front!  
index c790369e40cfc8557fde97961b21aa97ce10669d..d0f58817b314c940fad692f80591996c7917630e 100644 (file)
@@ -1007,8 +1007,8 @@ void MDS::my_dispatch(Message *m)
 
     list<CDir*> ls;
     p->second->get_dirfrags(ls);
+    if (ls.empty()) continue;                // must be an open dir.
     CDir *dir = ls.front();
-    if (!dir) continue;                      // must be a dir.
     if (!dir->get_parent_dir()) continue;    // must be linked.
     if (!dir->is_auth()) continue;           // must be auth.
 
index 18368d7fb6135d2d6d66a5bdf35273c0923f8e21..4b9a3517ac676b1b08b833f078dd0b8ff8d6cc5b 100644 (file)
@@ -1028,7 +1028,6 @@ void Migrator::export_reverse(CDir *dir)
   }
 
   // re-import the metadata
-  list<dirfrag_t> imported_subdirs;
   int num_imported_inodes = 0;
 
   for (list<bufferlist>::iterator p = export_data[dir].begin();
@@ -1038,7 +1037,6 @@ void Migrator::export_reverse(CDir *dir)
       decode_import_dir(*p, 
                        export_peer[dir], 
                        dir,                 // import root
-                       imported_subdirs,
                       0);
   }
 
@@ -1404,8 +1402,8 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
     // open export dirs/bounds?
     assert(import_bound_inos.count(dir->dirfrag()) == 0);
     import_bound_inos[dir->dirfrag()].clear();
-    for (list<dirfrag_t>::iterator it = m->get_exports().begin();
-         it != m->get_exports().end();
+    for (list<dirfrag_t>::iterator it = m->get_bounds().begin();
+         it != m->get_bounds().end();
          it++) {
       dout(7) << "  checking dir " << hex << *it << dec << endl;
       CInode *in = cache->get_inode(it->ino);
@@ -1426,19 +1424,19 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
   }
   
 
-  // verify we have all exports
+  // verify we have all bounds
   int waiting_for = 0;
-  for (list<dirfrag_t>::iterator it = m->get_exports().begin();
-       it != m->get_exports().end();
+  for (list<dirfrag_t>::iterator it = m->get_bounds().begin();
+       it != m->get_bounds().end();
        it++) {
     dirfrag_t df = *it;
-    CDir *dir = cache->get_dirfrag(df);
-    if (dir) {
-      if (!dir->state_test(CDir::STATE_IMPORTBOUND)) {
-        dout(7) << "  pinning import bound " << *dir << endl;
-        dir->get(CDir::PIN_IMPORTBOUND);
-        dir->state_set(CDir::STATE_IMPORTBOUND);
-       import_bounds[dir].insert(dir);
+    CDir *bound = cache->get_dirfrag(df);
+    if (bound) {
+      if (!bound->state_test(CDir::STATE_IMPORTBOUND)) {
+        dout(7) << "  pinning import bound " << *bound << endl;
+        bound->get(CDir::PIN_IMPORTBOUND);
+        bound->state_set(CDir::STATE_IMPORTBOUND);
+       import_bounds[dir].insert(bound);
       } else {
         dout(7) << "  already pinned import bound " << *dir << endl;
       }
@@ -1485,17 +1483,12 @@ class C_MDS_ImportDirLoggedStart : public Context {
   Migrator *migrator;
   CDir *dir;
   int from;
-  list<dirfrag_t> imported_subdirs;
-  list<dirfrag_t> exports;
 public:
-  C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, int f, 
-                            list<dirfrag_t>& is, list<dirfrag_t>& e) :
+  C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, int f) :
     migrator(m), dir(d), from(f) {
-    imported_subdirs.swap(is);
-    exports.swap(e);
   }
   void finish(int r) {
-    migrator->import_logged_start(dir, from, imported_subdirs, exports);
+    migrator->import_logged_start(dir, from);
   }
 };
 
@@ -1511,7 +1504,7 @@ void Migrator::handle_export_dir(MExportDir *m)
   cache->show_subtrees();
 
   // start the journal entry
-  EImportStart *le = new EImportStart(dir->dirfrag(), m->get_exports());
+  EImportStart *le = new EImportStart(dir->dirfrag(), m->get_bounds());
   le->metablob.add_dir_context(dir);
   
   // adjust auth (list us _first_)
@@ -1519,7 +1512,6 @@ void Migrator::handle_export_dir(MExportDir *m)
   cache->verify_subtree_bounds(dir, import_bounds[dir]);
 
   // add this crap to my cache
-  list<dirfrag_t> imported_subdirs;
   int num_imported_inodes = 0;
 
   for (list<bufferlist>::iterator p = m->get_dirstate().begin();
@@ -1529,11 +1521,9 @@ void Migrator::handle_export_dir(MExportDir *m)
       decode_import_dir(*p, 
                        oldauth, 
                        dir,                 // import root
-                       imported_subdirs,
                       le);
   }
-  dout(10) << " " << imported_subdirs.size() << " imported subdirs" << endl;
-  dout(10) << " " << m->get_exports().size() << " imported nested exports" << endl;
+  dout(10) << " " << m->get_bounds().size() << " imported bounds" << endl;
   
   // include bounds in EImportStart
   for (set<CDir*>::iterator it = import_bounds[dir].begin();
@@ -1553,8 +1543,7 @@ void Migrator::handle_export_dir(MExportDir *m)
 
   // log it
   mds->mdlog->submit_entry(le,
-                          new C_MDS_ImportDirLoggedStart(this, dir, m->get_source().num(), 
-                                                         imported_subdirs, m->get_exports()));
+                          new C_MDS_ImportDirLoggedStart(this, dir, m->get_source().num()));
 
   // note state
   import_state[dir->dirfrag()] = IMPORT_LOGGINGSTART;
@@ -1703,9 +1692,7 @@ void Migrator::import_reverse_unpin(CDir *dir)
 }
 
 
-void Migrator::import_logged_start(CDir *dir, int from,
-                                  list<dirfrag_t> &imported_subdirs,
-                                  list<dirfrag_t> &exports)
+void Migrator::import_logged_start(CDir *dir, int from) 
 {
   dout(7) << "import_logged " << *dir << endl;
 
@@ -1866,7 +1853,6 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist& bl, int& off, int ol
 int Migrator::decode_import_dir(bufferlist& bl,
                               int oldauth,
                               CDir *import_root,
-                              list<dirfrag_t>& imported_subdirs,
                               EImportStart *le)
 {
   int off = 0;
@@ -1882,10 +1868,6 @@ int Migrator::decode_import_dir(bufferlist& bl,
   
   dout(7) << "decode_import_dir " << *dir << endl;
 
-  // add to list
-  if (dir != import_root)
-    imported_subdirs.push_back(dir->dirfrag());
-
   // assimilate state
   dstate.update_dir( dir );
 
index 5a688701262c0d66a7dfdcd1a57c9bfc88330b72..1caa8bb0f32f2d993ccb23d83454a33d92869236 100644 (file)
@@ -202,7 +202,6 @@ public:
   int decode_import_dir(bufferlist& bl,
                        int oldauth,
                        CDir *import_root,
-                       list<dirfrag_t>& imported_subdirs,
                        EImportStart *le);
   /*
   void got_hashed_replica(CDir *import,
@@ -215,9 +214,7 @@ protected:
   void import_reverse_unfreeze(CDir *dir);
   void import_reverse_unpin(CDir *dir);
   void import_notify_abort(CDir *dir);
-  void import_logged_start(CDir *dir, int from,
-                              list<dirfrag_t> &imported_subdirs,
-                              list<dirfrag_t> &exports);
+  void import_logged_start(CDir *dir, int from);
   void handle_export_finish(MExportDirFinish *m);
 public:
   void import_finish(CDir *dir, bool now=false);
index 6135aaadc83f34e6bb1fe4587037fcb48d4aa275..6ab130907ef6099d0c4df3c80e5fd91d6b187227 100644 (file)
@@ -527,19 +527,60 @@ void Server::dispatch_request(Message *m, CInode *ref)
 
 CDir* Server::try_open_dir(CInode *in, frag_t fg, MClientRequest *req)
 {
-  if (!in->get_dirfrag(fg) && in->is_frozen_dir()) {
-    // doh!
-    dout(10) << " dir inode is frozen, can't open dir, waiting " << *in << endl;
+  CDir *dir = in->get_dirfrag(fg);
+  if (dir) 
+    return dir; 
+
+  if (in->is_frozen_dir()) {
+    dout(10) << "try_open_dir: dir inode is frozen, waiting " << *in << endl;
     assert(in->get_parent_dir());
     in->get_parent_dir()->add_waiter(CDir::WAIT_UNFREEZE,
                                      new C_MDS_RetryRequest(mds, req, in));
     return 0;
   }
-  
+
   return in->get_or_open_dirfrag(mds->mdcache, fg);
 }
 
+CDir* Server::try_open_auth_dir(CInode *diri, frag_t fg, MClientRequest *req)
+{
+  CDir *dir = diri->get_dirfrag(fg);
 
+  // not open and inode not mine?
+  if (!dir && !diri->is_auth()) {
+    int inauth = diri->authority().first;
+    dout(7) << "try_open_auth_dir: not open, not inode auth, fw to mds" << inauth << endl;
+    mdcache->request_forward(req, inauth);
+    return 0;
+  }
+
+  // not open and inode frozen?
+  if (!dir && diri->is_frozen_dir()) {
+    dout(10) << "try_open_dir: dir inode is frozen, waiting " << *diri << endl;
+    assert(diri->get_parent_dir());
+    diri->get_parent_dir()->add_waiter(CDir::WAIT_UNFREEZE,
+                                      new C_MDS_RetryRequest(mds, req, diri));
+    return 0;
+  }
+
+  // invent?
+  if (!dir) {
+    assert(diri->is_auth());
+    dir = diri->get_or_open_dirfrag(mds->mdcache, fg);
+  }
+  assert(dir);
+  // am i auth for the dirfrag?
+  if (!dir->is_auth()) {
+    int auth = dir->authority().first;
+    dout(7) << "try_open_auth_dir: not auth for " << *dir
+           << ", fw to mds" << auth << endl;
+    mdcache->request_forward(req, auth);
+    return 0;
+  }
+
+  return dir;
+}
 
 
 
@@ -830,22 +871,8 @@ void Server::handle_client_readdir(MClientRequest *req,
     return;
   }
   
-  // get the dir?
-  if (!diri->get_dirfrag(fg) && !diri->is_auth()) {
-    dout(10) << "not auth for " << fg << " or the inode " << *diri << ", fwd" << endl;
-    mdcache->request_forward(req, diri->authority().first);
-    return;
-  }
-
-  CDir *dir = try_open_dir(diri, fg, req);
+  CDir *dir = try_open_auth_dir(diri, fg, req);
   if (!dir) return;
-  assert(dir);
-
-  if (!dir->is_auth()) {
-    dout(10) << "not auth for " << *dir << ", fwd" << endl;
-    mdcache->request_forward(req, dir->authority().first);
-    return;
-  }
 
   // ok!
   assert(dir->is_auth());
@@ -970,32 +997,11 @@ CDir *Server::validate_new_dentry_dir(MClientRequest *req, CInode *diri, string&
 
   // which dirfrag?
   frag_t fg = diri->pick_dirfrag(name);
-  CDir *dir = diri->get_dirfrag(fg);
-
-  // not open?
-  if (!dir && !diri->is_auth()) {
-    int dirauth = diri->authority().first;
-    dout(7) << "validate_new_dentry_dir: don't know dir auth, not open, auth is i think mds" << dirauth << endl;
-    mdcache->request_forward(req, dirauth);
-    return false;
-  }
-  
-  // not me?
-  if (dir && !dir->is_auth()) {
-    int auth = dir->authority().first;
-    dout(7) << "validate_new_dentry_dir on " << req->get_path() << ", dentry " << *dir
-           << " dn " << name
-           << " not mine, fw to mds" << auth << endl;
-    mdcache->request_forward(req, auth);
-    return false;
-  }
 
-  // ok, let's open it then.
-  assert(diri->is_auth());
-  dir = try_open_dir(diri, fg, req);
+  CDir *dir = try_open_auth_dir(diri, fg, req);
   if (!dir)
-    return false;
-  
+    return 0;
+
   // dir auth pinnable?
   if (!dir->can_auth_pin()) {
     dout(7) << "validate_new_dentry_dir: dir " << *dir << " not pinnable, waiting" << endl;
@@ -1386,19 +1392,12 @@ void Server::handle_client_unlink(MClientRequest *req,
     return;
   }
 
-  // am i not open, not auth?
-  frag_t fg = diri->pick_dirfrag(name);
-  if (!diri->get_dirfrag(fg) && !diri->is_auth()) {
-    int dirauth = diri->authority().first;
-    dout(7) << "don't know dir auth, not open, auth is i think " << dirauth << endl;
-    mdcache->request_forward(req, dirauth);
-    return;
-  }
-  
-  CDir *dir = try_open_dir(diri, fg, req);
+  // get the dir, if it's not frozen etc.
+  CDir *dir = validate_new_dentry_dir(req, diri, name);
   if (!dir) return;
+  // ok, it's auth, and authpinnable.
 
-  // does it exist?
+  // does the dentry exist?
   CDentry *dn = dir->lookup(name);
   if (!dn) {
     if (!dir->is_complete()) {
@@ -1676,7 +1675,7 @@ void Server::handle_client_rename(MClientRequest *req,
     return;
   }
 
-  CDir *srcdir = try_open_dir(srcdiri, srcfg, req);
+  CDir *srcdir = try_open_auth_dir(srcdiri, srcfg, req);
   if (!srcdir) return;
   dout(7) << "handle_client_rename srcdir is " << *srcdir << endl;
   
index de52d4d513d9950a8003256cf68f3b2aa27a1b91..719bdf11aa03c2302d14991e13a9bf6eaa662957 100644 (file)
@@ -52,6 +52,7 @@ public:
                       LogEvent *event2 = 0);
   
   CDir *try_open_dir(CInode *in, frag_t fg, MClientRequest *req);
+  CDir* try_open_auth_dir(CInode *diri, frag_t, MClientRequest *req);
 
 
   // clients
index d1a63a111855b9e3810b71edb0eda0a85798a7bc..4d5196c0bf6efe79da12d03495555669cf1d469a 100644 (file)
@@ -321,8 +321,11 @@ class EMetaBlob {
   }
   
   void print(ostream& out) const {
-    out << "[metablob " << lump_order.front()
-       << ", " << lump_map.size() << " dirs]";
+    if (lump_order.empty())
+      out << "[metablob empty]";
+    else
+      out << "[metablob " << lump_order.front()
+         << ", " << lump_map.size() << " dirs]";
   }
 
   bool has_expired(MDS *mds);
index 56b5839df169f064783aff99f7e2dac540e4e499..13b9b2843e03795e39036ff99a2c590f70f400de 100644 (file)
@@ -64,9 +64,13 @@ void EString::replay(MDS *mds)
  * 
  * - been safely committed to its dirslice.
  *
- * - has been safely exported. note that !is_auth() && !is_proxy()
- * implies safely exported.  if !is_auth() && is_proxy(), we need to
- * add a waiter for the export to complete.
+ * - has been safely exported.  i.e., authority().first != us.  
+ *   in particular, auth of <us, them> is not enough, we need to
+ *   wait for <them,-2>.  
+ *
+ * note that this check is overly conservative, in that we'll
+ * try to flush the dir again if we reimport the subtree, even though
+ * later journal entries contain the same dirty data (from the import).
  *
  */
 bool EMetaBlob::has_expired(MDS *mds)
@@ -81,30 +85,36 @@ bool EMetaBlob::has_expired(MDS *mds)
 
     // FIXME: check the slice only
 
-    if (dir->is_proxy()) {
-      dout(10) << "EMetaBlob.has_expired am proxy, needed dirv " << lp->second.dirv
-              << " for " << *dir << endl;
-      return false;      // we need to wait until the export flushes!
-    }
-    if (!dir->is_auth()) {
+    if (dir->authority().first != mds->get_nodeid()) {
       dout(10) << "EMetaBlob.has_expired not auth, needed dirv " << lp->second.dirv
               << " for " << *dir << endl;
       continue;       // not our problem
     }
+    if (dir->get_committed_version() >= lp->second.dirv) {
+      dout(10) << "EMetaBlob.has_expired have dirv " << lp->second.dirv
+              << " for " << *dir << endl;
+      continue;       // yay
+    }
+    
+    if (dir->auth_is_ambiguous()) {
+      dout(10) << "EMetaBlob.has_expired ambiguous auth on " 
+              << *dir << endl;
+      return false;  // not committed.
+    }
 
     if (dir->get_committed_version() < lp->second.dirv) {
       dout(10) << "EMetaBlob.has_expired need dirv " << lp->second.dirv
               << " for " << *dir << endl;
       return false;  // not committed.
-    } else {
-      dout(10) << "EMetaBlob.has_expired have dirv " << lp->second.dirv
-              << " for " << *dir << endl;
     }
+
+    assert(0);  // i goofed the logic
   }
 
   return true;  // all dirlumps expired.
 }
 
+
 void EMetaBlob::expire(MDS *mds, Context *c)
 {
   map<CDir*,version_t> commit;  // dir -> version needed
@@ -122,6 +132,17 @@ void EMetaBlob::expire(MDS *mds, Context *c)
     
     // FIXME: check the slice only
 
+    if (dir->authority().first != mds->get_nodeid()) {
+      dout(10) << "EMetaBlob.expire not auth, needed dirv " << lp->second.dirv
+              << " for " << *dir << endl;
+      continue;     // not our problem
+    }
+    if (dir->get_committed_version() >= lp->second.dirv) {
+      dout(10) << "EMetaBlob.expire have dirv " << lp->second.dirv
+              << " on " << *dir << endl;
+      continue;   // yay
+    }
+    
     if (dir->auth_is_ambiguous()) {
       // wait until export is acked (logged on remote) and committed (logged locally)
       CDir *ex = mds->mdcache->get_subtree_root(dir);
@@ -130,30 +151,33 @@ void EMetaBlob::expire(MDS *mds, Context *c)
       waitfor_export.push_back(ex);
       continue;
     }
-    if (!dir->is_auth()) {
-      dout(10) << "EMetaBlob.expire not auth, needed dirv " << lp->second.dirv
-              << " for " << *dir << endl;
-      continue;     // not our problem
-    }
     if (dir->get_committed_version() < lp->second.dirv) {
       dout(10) << "EMetaBlob.expire need dirv " << lp->second.dirv
               << ", committing " << *dir << endl;
       commit[dir] = MAX(commit[dir], lp->second.dirv);
       ncommit++;
-    } else {
-      dout(10) << "EMetaBlob.expire have dirv " << lp->second.dirv
-              << " on " << *dir << endl;
+      continue;
     }
+
+    assert(0);  // hrm
   }
 
-  // commit
-  assert(!commit.empty());
+  // commit or wait for export
+  // FIXME: what if export aborts?  need to retry!
+  assert(!commit.empty() || !waitfor_export.empty());
 
+  //C_Gather *gather = new C_Gather(new C_journal_RetryExpire(mds, this, c));
   C_Gather *gather = new C_Gather(c);
   for (map<CDir*,version_t>::iterator p = commit.begin();
        p != commit.end();
-       ++p)
-    p->first->commit(p->second, gather->new_sub());
+       ++p) {
+    if (p->first->can_auth_pin())
+      p->first->commit(p->second, gather->new_sub());
+    else
+      // pbly about to export|split|merge. 
+      // just wait for it to unfreeze, then retry
+      p->first->add_waiter(CDir::WAIT_AUTHPINNABLE, gather->new_sub());  
+  }
   for (list<CDir*>::iterator p = waitfor_export.begin();
        p != waitfor_export.end();
        ++p) 
index df34dc9b5d03dfd06b7968dc0d200d4af6eebb2f..9430381ca3f7914cc8103bb2291109a4f4ecd90e 100644 (file)
@@ -202,7 +202,7 @@ class MDSCacheObject {
   unsigned state;     // state bits
   
   int      ref;       // reference count
-  set<int> ref_set;
+  multiset<int> ref_set;
 
   map<int,int> replicas;      // [auth] mds -> nonce
   int          replica_nonce; // [replica] defined on replica
@@ -226,19 +226,20 @@ class MDSCacheObject {
   // pins
   int get_num_ref() { return ref; }
   bool is_pinned_by(int by) { return ref_set.count(by); }
-  set<int>& get_ref_set() { return ref_set; }
+  multiset<int>& get_ref_set() { return ref_set; }
+  virtual const char *pin_name(int by) = 0;
 
   virtual void last_put() {}
   virtual void bad_put(int by) {
-       assert(ref_set.count(by) == 1);
+       assert(ref_set.count(by) > 0);
        assert(ref > 0);
   }
   void put(int by) {
-    if (ref == 0 || ref_set.count(by) != 1) {
+    if (ref == 0 || ref_set.count(by) == 0) {
          bad_put(by);
     } else {
          ref--;
-         ref_set.erase(by);
+         ref_set.erase(ref_set.find(by));
          assert(ref == (int)ref_set.size());
          if (ref == 0)
                last_put();
@@ -247,11 +248,11 @@ class MDSCacheObject {
 
   virtual void first_get() {}
   virtual void bad_get(int by) {
-       assert(ref_set.count(by) == 0);
+       assert(by < 0 || ref_set.count(by) == 0);
        assert(0);
   }
   void get(int by) {
-    if (ref_set.count(by)) {
+    if (by >= 0 && ref_set.count(by)) {
          bad_get(by);
     } else {
          if (ref == 0) 
@@ -262,6 +263,20 @@ class MDSCacheObject {
        }
   }
 
+  void print_pin_set(ostream& out) {
+    multiset<int>::iterator it = ref_set.begin();
+    while (it != ref_set.end()) {
+      out << " " << pin_name(*it);
+      int last = *it;
+      int c = 1;
+      do {
+               it++;
+               if (it == ref_set.end()) break;
+      } while (*it == last);
+      if (c > 1)
+               out << "*" << c;
+    }
+  }
 
 
   // --------------------------------------------
index 1d694f1a4d7d5712ffd6cd181da9634cb15e8f5c..d8bc40838e8d40dbac4045a639f176966dcba98b 100644 (file)
@@ -22,7 +22,7 @@ class MExportDir : public Message {
   dirfrag_t dirfrag;
   
   list<bufferlist> dirstate; // a bl for reach dir
-  list<dirfrag_t>  exports;
+  list<dirfrag_t>  bounds;
 
  public:  
   MExportDir() {}
@@ -37,7 +37,7 @@ class MExportDir : public Message {
 
   dirfrag_t get_dirfrag() { return dirfrag; }
   list<bufferlist>& get_dirstate() { return dirstate; }
-  list<dirfrag_t>& get_exports() { return exports; }
+  list<dirfrag_t>& get_bounds() { return bounds; }
 
   void add_dir(bufferlist& dir) {
     dirstate.push_back(dir);
@@ -46,19 +46,19 @@ class MExportDir : public Message {
     dirstate = ls;
   }
   void add_export(dirfrag_t df) { 
-    exports.push_back(df); 
+    bounds.push_back(df); 
   }
 
   virtual void decode_payload() {
     int off = 0;
     payload.copy(off, sizeof(dirfrag), (char*)&dirfrag);
     off += sizeof(dirfrag);
-    ::_decode(exports, payload, off);
+    ::_decode(bounds, payload, off);
     ::_decode(dirstate, payload, off);
   }
   virtual void encode_payload() {
     payload.append((char*)&dirfrag, sizeof(dirfrag));
-    ::_encode(exports, payload);
+    ::_encode(bounds, payload);
     ::_encode(dirstate, payload);
   }
 
index 2fc85db4b67c9f7eb9f4319217dc958b844b807e..d9a6e038577f48c54344667cc4527ce9d2028574 100644 (file)
@@ -27,7 +27,7 @@ class MExportDirPrep : public Message {
      dentries are the links to each inode.
      dirs map includes base dir (ino)
   */
-  list<dirfrag_t>                exports;
+  list<dirfrag_t>                bounds;
 
   list<CInodeDiscover*>          inodes;
   map<inodeno_t,dirfrag_t>       inode_dirfrag;
@@ -42,7 +42,7 @@ class MExportDirPrep : public Message {
 
  public:
   dirfrag_t get_dirfrag() { return dirfrag; }
-  list<dirfrag_t>& get_exports() { return exports; }
+  list<dirfrag_t>& get_bounds() { return bounds; }
   list<CInodeDiscover*>& get_inodes() { return inodes; }
   list<frag_t>& get_inode_dirfrags(inodeno_t ino) { 
     return frags_by_ino[ino];
@@ -89,7 +89,7 @@ class MExportDirPrep : public Message {
   }
 
   void add_export(dirfrag_t df) {
-    exports.push_back( df );
+    bounds.push_back( df );
   }
   void add_inode(dirfrag_t df, const string& dentry, CInodeDiscover *in) {
     inodes.push_back(in);
@@ -109,7 +109,7 @@ class MExportDirPrep : public Message {
     payload.copy(off, sizeof(dirfrag), (char*)&dirfrag);
     off += sizeof(dirfrag);
     
-    ::_decode(exports, payload, off);
+    ::_decode(bounds, payload, off);
     
     // inodes
     int ni;
@@ -152,7 +152,7 @@ class MExportDirPrep : public Message {
   virtual void encode_payload() {
     payload.append((char*)&dirfrag, sizeof(dirfrag));
 
-    ::_encode(exports, payload);
+    ::_encode(bounds, payload);
 
     // inodes
     int ni = inodes.size();