]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
bounding dirfrag_t's maybe ambiguous
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 20 Jul 2007 17:47:49 +0000 (17:47 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 20 Jul 2007 17:47:49 +0000 (17:47 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1538 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/TODO
trunk/ceph/include/frag.h
trunk/ceph/mds/CDir.h
trunk/ceph/mds/CInode.cc
trunk/ceph/mds/CInode.h
trunk/ceph/mds/MDCache.cc
trunk/ceph/mds/MDCache.h
trunk/ceph/mds/Migrator.cc
trunk/ceph/mds/Migrator.h

index ea7c45bd3864c787fb2dc12246952751db6b0231..f56d68b1d8f1edcbb3cf41cc95470d8b41864cc5 100644 (file)
@@ -55,10 +55,15 @@ sage doc
 sage mds
 
 - the split/merge plan:
+
+***  - should get_dirfrags(frag_t) return partial matches?  bc we might have the two frags listed separately even tho they've merged..
+
   - dirfragtree is lazily consistent.  no lock.  bcast by primary when it updates.  
   - CDir is never request pinned
     - add a CInode sticky_dir flag to somehow pin all cdirs on the fly.  
     - STICKY dir state and pin?  make sure it's kept across import/export/fragment
+  - pull _bound maps out of Migrator; they are redundant (trust the subtree map!)
+
   - auth journals and applies update in the request update pipeline
   - bcast to dir replicas
   - inode auth will journal inode update separately/lazily
index 3dd2b6edf948f84ec0c425af09434c09cbfefed7..641aa0707ee4c11e54f253f6017cecacf36d8800 100644 (file)
@@ -137,8 +137,11 @@ class fragtree_t {
       return p->second;
   }
   void get_leaves(list<frag_t>& ls) const {
+    get_leaves(frag_t(), ls);
+  }
+  void get_leaves(frag_t under, list<frag_t>& ls) const {
     list<frag_t> q;
-    q.push_back(frag_t());
+    q.push_back(under);
     while (!q.empty()) {
       frag_t t = q.front();
       q.pop_front();
index 1c84d0ef7bb5533bd798ce10fb08f4756f4ae5d0..fff398f5b8932a9d014512e48d756dfaef38589e 100644 (file)
@@ -67,6 +67,7 @@ class CDir : public MDSCacheObject {
   static const int PIN_EXPORTING =    8;
   static const int PIN_IMPORTBOUND =  9;
   static const int PIN_EXPORTBOUND = 10;
+  static const int PIN_STICKY =      11;
   const char *pin_name(int p) {
     switch (p) {
     case PIN_DNWAITER: return "dnwaiter";
@@ -79,6 +80,7 @@ class CDir : public MDSCacheObject {
     case PIN_IMPORTBOUND: return "importbound";
     case PIN_EXPORTBOUND: return "exportbound";
     case PIN_AUTHPIN: return "authpin";
+    case PIN_STICKY: return "sticky";
     default: return generic_pin_name(p);
     }
   }
@@ -92,13 +94,13 @@ class CDir : public MDSCacheObject {
   static const unsigned STATE_COMMITTING =    (1<< 8);   // mid-commit
   static const unsigned STATE_FETCHING =      (1<< 9);   // currenting fetching
   static const unsigned STATE_DELETED =       (1<<10);
-  //static const unsigned STATE_IMPORT =        (1<<11);   // flag set if this is an import.
   static const unsigned STATE_EXPORT    =     (1<<12);
   static const unsigned STATE_IMPORTBOUND =   (1<<13);
   static const unsigned STATE_EXPORTBOUND =   (1<<14);
   static const unsigned STATE_EXPORTING =     (1<<15);
   static const unsigned STATE_IMPORTING =     (1<<16);
   static const unsigned STATE_FRAGMENTING =   (1<<17);
+  static const unsigned STATE_STICKY =        (1<<18);  // sticky pin due to inode stickydirs
 
   // common states
   static const unsigned STATE_CLEAN =  0;
@@ -113,13 +115,15 @@ class CDir : public MDSCacheObject {
   STATE_EXPORT
   |STATE_IMPORTING
   |STATE_IMPORTBOUND|STATE_EXPORTBOUND
-  |STATE_FROZENTREE;
+  |STATE_FROZENTREE
+  |STATE_STICKY;
   static const unsigned MASK_STATE_EXPORT_KEPT = 
   STATE_EXPORTING
   |STATE_IMPORTBOUND|STATE_EXPORTBOUND
   |STATE_FROZENTREE
   |STATE_FROZENDIR
-  |STATE_EXPORT;
+  |STATE_EXPORT
+  |STATE_STICKY;
 
 
   // -- rep spec --
index 83b7cdeb280efbf2acd3a4e42a2273c7cff109ae..9ede5b6d5360a7d6810211f314514f1d163885c8 100644 (file)
@@ -133,6 +133,17 @@ frag_t CInode::pick_dirfrag(const string& dn)
   return dirfragtree[H(dn)];
 }
 
+void CInode::get_dirfrags(frag_t fg, list<CDir*>& ls)
+{
+  list<frag_t> fglist;
+  dirfragtree.get_leaves(fg, fglist);
+  for (list<frag_t>::iterator p = fglist.begin();
+       p != fglist.end();
+       ++p) 
+    if (dirfrags.count(*p))
+      ls.push_back(dirfrags[*p]);
+}
+
 void CInode::get_dirfrags(list<CDir*>& ls) 
 {
   // all dirfrags
@@ -167,11 +178,12 @@ CDir *CInode::get_or_open_dirfrag(MDCache *mdcache, frag_t fg)
 
   // have it?
   CDir *dir = get_dirfrag(fg);
-  if (dir) return dir;
-  
-  // create it.
-  assert(is_auth());
-  dir = dirfrags[fg] = new CDir(this, fg, mdcache, true);
+  if (!dir) {
+    // create it.
+    assert(is_auth());
+    dir = new CDir(this, fg, mdcache, true);
+    add_dirfrag(dir);
+  }
   return dir;
 }
 
@@ -179,6 +191,12 @@ CDir *CInode::add_dirfrag(CDir *dir)
 {
   assert(dirfrags.count(dir->dirfrag().frag) == 0);
   dirfrags[dir->dirfrag().frag] = dir;
+
+  if (stickydir_ref > 0) {
+    dir->state_set(CDir::STATE_STICKY);
+    dir->get(CDir::PIN_STICKY);
+  }
+
   return dir;
 }
 
@@ -194,6 +212,11 @@ void CInode::close_dirfrag(frag_t fg)
   if (dir->is_dirty())
     dir->mark_clean();
   
+  if (stickydir_ref > 0) {
+    dir->state_clear(CDir::STATE_STICKY);
+    dir->put(CDir::PIN_STICKY);
+  }
+  
   // dump any remaining dentries, for debugging purposes
   for (map<string,CDentry*>::iterator p = dir->items.begin();
        p != dir->items.end();
@@ -222,6 +245,36 @@ bool CInode::has_subtree_root_dirfrag()
 }
 
 
+void CInode::get_stickydirs()
+{
+  if (stickydir_ref == 0) {
+    get(PIN_STICKYDIRS);
+    for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
+        p != dirfrags.end();
+        ++p) {
+      p->second->state_set(CDir::STATE_STICKY);
+      p->second->get(CDir::PIN_STICKY);
+    }
+  }
+  stickydir_ref++;
+}
+
+void CInode::put_stickydirs()
+{
+  assert(stickydir_ref > 0);
+  stickydir_ref--;
+  if (stickydir_ref == 0) {
+    put(PIN_STICKYDIRS);
+    for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
+        p != dirfrags.end();
+        ++p) {
+      p->second->state_clear(CDir::STATE_STICKY);
+      p->second->put(CDir::PIN_STICKY);
+    }
+  }
+}
+
+
 void CInode::fragment_dir(frag_t basefrag, int bits)
 {
   dout(10) << "fragment_dir " << basefrag << " by " << bits << endl;
index 9e58949f20d55e19a20a5874caafe000a2bc272c..b7adff9f69dbbc3638244821655c93d6055eb1ce 100644 (file)
@@ -66,6 +66,7 @@ class CInode : public MDSCacheObject {
   static const int PIN_REMOTEPARENT = 15;
   static const int PIN_BATCHOPENJOURNAL = 16;
   static const int PIN_SCATTERED = 17;
+  static const int PIN_STICKYDIRS = 18;
 
   const char *pin_name(int p) {
     switch (p) {
@@ -79,6 +80,7 @@ class CInode : public MDSCacheObject {
     case PIN_REMOTEPARENT: return "remoteparent";
     case PIN_BATCHOPENJOURNAL: return "batchopenjournal";
     case PIN_SCATTERED: return "scattered";
+    case PIN_STICKYDIRS: return "stickydirs";
     default: return generic_pin_name(p);
     }
   }
@@ -139,8 +141,11 @@ class CInode : public MDSCacheObject {
   
 
   // -- cache infrastructure --
+private:
   map<frag_t,CDir*> dirfrags; // cached dir fragments
+  int stickydir_ref;
 
+public:
   frag_t pick_dirfrag(const string &dn);
   bool has_dirfrags() { return !dirfrags.empty(); }
   CDir* get_dirfrag(frag_t fg) {
@@ -149,6 +154,7 @@ class CInode : public MDSCacheObject {
     else
       return 0;
   }
+  void get_dirfrags(frag_t fg, list<CDir*>& ls);
   void get_dirfrags(list<CDir*>& ls);
   void get_nested_dirfrags(list<CDir*>& ls);
   void get_subtree_dirfrags(list<CDir*>& ls);
@@ -158,8 +164,12 @@ class CInode : public MDSCacheObject {
   void close_dirfrags();
   bool has_subtree_root_dirfrag();
 
+  void get_stickydirs();
+  void put_stickydirs();  
+
   void fragment_dir(frag_t base, int bits);
 
+
  protected:
   // parent dentries in cache
   CDentry         *parent;             // primary link
@@ -198,6 +208,7 @@ protected:
   CInode(MDCache *c, bool auth=true) : 
     mdcache(c),
     last_open_journaled(0),
+    stickydir_ref(0),
     parent(0), force_auth(CDIR_AUTH_DEFAULT),
     replica_caps_wanted(0),
     auth_pins(0), nested_auth_pins(0),
index 444caf48127a67fb96e203e6e565c7813766fc5f..1d432f04495368ddc3529ca986a1593cbd253c21 100644 (file)
@@ -1017,8 +1017,14 @@ void MDCache::send_resolve_now(int who)
     
     if (migrator->is_importing(dir->dirfrag())) {
       // ambiguous (mid-import)
-      m->add_ambiguous_import(dir->dirfrag(), 
-                             migrator->get_import_bound_inos(dir->dirfrag()));
+      //  NOTE: because we are first authority, import state is at least IMPORT_LOGGINSTART.
+      assert(migrator->get_import_state(dir->dirfrag()) >= Migrator::IMPORT_LOGGINGSTART);
+      set<CDir*> bounds;
+      get_subtree_bounds(dir, bounds);
+      list<dirfrag_t> dfls;
+      for (set<CDir*>::iterator p = bounds.begin(); p != bounds.end(); ++p)
+       dfls.push_back((*p)->dirfrag());
+      m->add_ambiguous_import(dir->dirfrag(), dfls);
     } else {
       // not ambiguous.
       m->add_subtree(dir->dirfrag());
index fcff21976645e157531dee983a6d0464fd46d2bb..6f2dbee4293ea8b898d0604132f952161e6bb87d 100644 (file)
@@ -434,13 +434,14 @@ public:
       return inode_map[ino];
     return NULL;
   }
-  CDir* get_dir(inodeno_t dirino) {  // deprecated
-    return get_dirfrag(dirfrag_t(dirino, frag_t()));
-  }    
   CDir* get_dirfrag(dirfrag_t df) {
     if (!have_inode(df.ino)) return NULL;
     return inode_map[df.ino]->get_dirfrag(df.frag);
   }
+  void get_dirfrags(dirfrag_t df, list<CDir*>& ls) {
+    if (have_inode(df.ino))
+      inode_map[df.ino]->get_dirfrags(df.frag, ls);
+  }
 
   MDSCacheObject *get_object(MDSCacheObjectInfo &info);
 
index ae46f18f2b263f8b35f7b4e13a3903496281a601..050fa09a882718bcaccd2d6ec927b506db0ebcff 100644 (file)
@@ -206,13 +206,17 @@ void Migrator::handle_mds_failure_or_stop(int who)
       case EXPORT_PREPPING:
        if (p->second != EXPORT_WARNING) 
          dout(10) << "export state=loggingstart|prepping : unpinning bounds, unfreezing" << endl;
-       // unpin bounds
-       for (set<CDir*>::iterator p = export_bounds[dir].begin();
-            p != export_bounds[dir].end();
-            ++p) {
-         CDir *bd = *p;
-         bd->put(CDir::PIN_EXPORTBOUND);
-         bd->state_clear(CDir::STATE_EXPORTBOUND);
+       {
+         // unpin bounds
+         set<CDir*> bounds;
+         cache->get_subtree_bounds(dir, bounds);
+         for (set<CDir*>::iterator p = bounds.begin();
+              p != bounds.end();
+              ++p) {
+           CDir *bd = *p;
+           bd->put(CDir::PIN_EXPORTBOUND);
+           bd->state_clear(CDir::STATE_EXPORTBOUND);
+         }
        }
        dir->unfreeze_tree();
        cache->adjust_subtree_auth(dir, mds->get_nodeid());
@@ -354,7 +358,11 @@ void Migrator::handle_mds_failure_or_stop(int who)
       case IMPORT_ACKING:
        // hrm.  make this an ambiguous import, and wait for exporter recovery to disambiguate
        dout(10) << "import state=acking : noting ambiguous import " << *dir << endl;
-       cache->add_ambiguous_import(dir, import_bounds[dir]);
+       {
+         set<CDir*> bounds;
+         cache->get_subtree_bounds(dir, bounds);
+         cache->add_ambiguous_import(dir, bounds);
+       }
        break;
 
       case IMPORT_ABORTING:
@@ -566,8 +574,8 @@ void Migrator::export_frozen(CDir *dir)
   // note the bounds.
   //  force it into a subtree by listing auth as <me,me>.
   cache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
-  cache->get_subtree_bounds(dir, export_bounds[dir]);
-  set<CDir*> &bounds = export_bounds[dir];
+  set<CDir*> bounds;
+  cache->get_subtree_bounds(dir, bounds);
 
   // generate prep message, log entry.
   MExportDirPrep *prep = new MExportDirPrep(dir->dirfrag());
@@ -664,10 +672,14 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
   }
 
   // send warnings
-  assert(export_peer.count(dir));
   int dest = export_peer[dir];
+  set<CDir*> bounds;
+  cache->get_subtree_bounds(dir, bounds);
+
+  assert(export_peer.count(dir));
   assert(export_warning_ack_waiting.count(dir) == 0);
   assert(export_notify_ack_waiting.count(dir) == 0);
+
   for (map<int,int>::iterator p = dir->replicas_begin();
        p != dir->replicas_end();
        ++p) {
@@ -680,7 +692,7 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
     MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), true,
                                                    pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
                                                    pair<int,int>(mds->get_nodeid(),export_peer[dir]));
-    notify->copy_bounds(export_bounds[dir]);
+    notify->copy_bounds(bounds);
     mds->send_message_mds(notify, p->first, MDS_PORT_MIGRATOR);
     
   }
@@ -706,14 +718,12 @@ void Migrator::export_go(CDir *dir)
   export_warning_ack_waiting.erase(dir);
   export_state[dir] = EXPORT_EXPORTING;
 
-  assert(export_bounds.count(dir) == 1);
   assert(export_data.count(dir) == 0);
 
   assert(dir->get_cum_auth_pins() == 0);
 
   // set ambiguous auth
   cache->adjust_subtree_auth(dir, dest, mds->get_nodeid());
-  cache->verify_subtree_bounds(dir, export_bounds[dir]);
   
   // fill export message with cache data
   C_Contexts *fin = new C_Contexts;       // collect all the waiters
@@ -735,8 +745,10 @@ void Migrator::export_go(CDir *dir)
   req->set_dirstate( export_data[dir] );
 
   // add bounds to message
-  for (set<CDir*>::iterator p = export_bounds[dir].begin();
-       p != export_bounds[dir].end();
+  set<CDir*> bounds;
+  cache->get_subtree_bounds(dir, bounds);
+  for (set<CDir*>::iterator p = bounds.begin();
+       p != bounds.end();
        ++p)
     req->add_export((*p)->dirfrag());
 
@@ -957,11 +969,14 @@ void Migrator::handle_export_ack(MExportDirAck *m)
   export_state[dir] = EXPORT_LOGGINGFINISH;
   export_data.erase(dir);
   
+  set<CDir*> bounds;
+  cache->get_subtree_bounds(dir, bounds);
+
   // log completion
   EExport *le = new EExport(dir);
   le->metablob.add_dir( dir, false );
-  for (set<CDir*>::iterator p = export_bounds[dir].begin();
-       p != export_bounds[dir].end();
+  for (set<CDir*>::iterator p = bounds.begin();
+       p != bounds.end();
        ++p) {
     CDir *bound = *p;
     le->get_bounds().insert(bound->dirfrag());
@@ -988,17 +1003,17 @@ void Migrator::export_reverse(CDir *dir)
   dout(7) << "export_reverse " << *dir << endl;
   
   assert(export_state[dir] == EXPORT_EXPORTING);
-  assert(export_bounds.count(dir));
   assert(export_data.count(dir));
   
   // adjust auth, with possible subtree merge.
-  cache->verify_subtree_bounds(dir, export_bounds[dir]);
   cache->adjust_subtree_auth(dir, mds->get_nodeid());
   cache->try_subtree_merge(dir);
 
   // unpin bounds
-  for (set<CDir*>::iterator p = export_bounds[dir].begin();
-       p != export_bounds[dir].end();
+  set<CDir*> bounds;
+  cache->get_subtree_bounds(dir, bounds);
+  for (set<CDir*>::iterator p = bounds.begin();
+       p != bounds.end();
        ++p) {
     CDir *bd = *p;
     bd->put(CDir::PIN_EXPORTBOUND);
@@ -1028,7 +1043,6 @@ void Migrator::export_reverse(CDir *dir)
 
   // some clean up
   export_data.erase(dir);
-  export_bounds.erase(dir);
   export_warning_ack_waiting.erase(dir);
   export_notify_ack_waiting.erase(dir);
 
@@ -1045,11 +1059,12 @@ void Migrator::export_logged_finish(CDir *dir)
 {
   dout(7) << "export_logged_finish " << *dir << endl;
 
-  cache->verify_subtree_bounds(dir, export_bounds[dir]);
-
   // send notifies
   int dest = export_peer[dir];
 
+  set<CDir*> bounds;
+  cache->get_subtree_bounds(dir, bounds);
+
   for (set<int>::iterator p = export_notify_ack_waiting[dir].begin();
        p != export_notify_ack_waiting[dir].end();
        ++p) {
@@ -1065,7 +1080,7 @@ void Migrator::export_logged_finish(CDir *dir)
                                    pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN),
                                    pair<int,int>(dest, CDIR_AUTH_UNKNOWN));
 
-    notify->copy_bounds(export_bounds[dir]);
+    notify->copy_bounds(bounds);
     
     mds->send_message_mds(notify, *p, MDS_PORT_MIGRATOR);
   }
@@ -1153,8 +1168,10 @@ void Migrator::export_finish(CDir *dir)
   dir->unfreeze_tree();
   
   // unpin bounds
-  for (set<CDir*>::iterator p = export_bounds[dir].begin();
-       p != export_bounds[dir].end();
+  set<CDir*> bounds;
+  cache->get_subtree_bounds(dir, bounds);
+  for (set<CDir*>::iterator p = bounds.begin();
+       p != bounds.end();
        ++p) {
     CDir *bd = *p;
     bd->put(CDir::PIN_EXPORTBOUND);
@@ -1180,7 +1197,6 @@ void Migrator::export_finish(CDir *dir)
   dir->put(CDir::PIN_EXPORTING);
   export_state.erase(dir);
   export_peer.erase(dir);
-  export_bounds.erase(dir);
   export_notify_ack_waiting.erase(dir);
 
   // queue finishers
@@ -1375,8 +1391,6 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
     }
 
     // open export dirs/bounds?
-    assert(import_bound_inos.count(dir->dirfrag()) == 0);
-    import_bound_inos[dir->dirfrag()].clear();
     for (list<dirfrag_t>::iterator it = m->get_bounds().begin();
          it != m->get_bounds().end();
          it++) {
@@ -1384,9 +1398,6 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
       CInode *in = cache->get_inode(it->ino);
       assert(in);
       
-      // note bound.
-      import_bound_inos[dir->dirfrag()].push_back(*it);
-
       CDir *dir = cache->get_dirfrag(*it);
       if (!dir) {
         dout(7) << "  opening nested export on " << *in << endl;
@@ -1400,24 +1411,33 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
   
 
   // verify we have all bounds
+  set<CDir*> import_bounds;
   int waiting_for = 0;
   for (list<dirfrag_t>::iterator it = m->get_bounds().begin();
        it != m->get_bounds().end();
        it++) {
     dirfrag_t df = *it;
-    CDir *bound = cache->get_dirfrag(df);
-    if (bound) {
-      if (!bound->state_test(CDir::STATE_IMPORTBOUND)) {
-        dout(7) << "  pinning import bound " << *bound << endl;
-        bound->get(CDir::PIN_IMPORTBOUND);
-        bound->state_set(CDir::STATE_IMPORTBOUND);
-       import_bounds[dir].insert(bound);
+    CInode *in = cache->get_inode(df.ino);
+    assert(in);
+    list<frag_t> fglist;
+    in->dirfragtree.get_leaves(df.frag, fglist);
+    for (list<frag_t>::iterator q = fglist.begin();
+        q != fglist.end();
+        ++q) {
+      CDir *bound = cache->get_dirfrag(dirfrag_t(df.ino, *q));
+      if (bound) {
+       if (!bound->state_test(CDir::STATE_IMPORTBOUND)) {
+         dout(7) << "  pinning import bound " << *bound << endl;
+         bound->get(CDir::PIN_IMPORTBOUND);
+         bound->state_set(CDir::STATE_IMPORTBOUND);
+         import_bounds.insert(bound);
+       } else {
+         dout(7) << "  already pinned import bound " << *bound << endl;
+       }
       } else {
-        dout(7) << "  already pinned import bound " << *bound << endl;
+       dout(7) << "  waiting for nested export dir on " << *cache->get_inode(df.ino) << endl;
+       waiting_for++;
       }
-    } else {
-      dout(7) << "  waiting for nested export dir on " << *cache->get_inode(df.ino) << endl;
-      waiting_for++;
     }
   }
 
@@ -1428,9 +1448,9 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
 
     // note that i am an ambiguous auth for this subtree.
     // specify bounds, since the exporter explicitly defines the region.
-    cache->adjust_bounded_subtree_auth(dir, import_bounds[dir]
+    cache->adjust_bounded_subtree_auth(dir, import_bounds, 
                                       pair<int,int>(oldauth, mds->get_nodeid()));
-    cache->verify_subtree_bounds(dir, import_bounds[dir]);
+    cache->verify_subtree_bounds(dir, import_bounds);
     
     // freeze.
     dir->_freeze_tree();
@@ -1484,7 +1504,6 @@ void Migrator::handle_export_dir(MExportDir *m)
   
   // adjust auth (list us _first_)
   cache->adjust_subtree_auth(dir, mds->get_nodeid(), oldauth);
-  cache->verify_subtree_bounds(dir, import_bounds[dir]);
 
   // add this crap to my cache
   map<int,entity_inst_t> imported_client_map;
@@ -1505,11 +1524,12 @@ void Migrator::handle_export_dir(MExportDir *m)
   dout(10) << " " << m->get_bounds().size() << " imported bounds" << endl;
   
   // include bounds in EImportStart
-  for (set<CDir*>::iterator it = import_bounds[dir].begin();
-       it != import_bounds[dir].end();
+  set<CDir*> import_bounds;
+  cache->get_subtree_bounds(dir, import_bounds);
+  for (set<CDir*>::iterator it = import_bounds.begin();
+       it != import_bounds.end();
        it++) {
     CDir *bd = *it;
-
     // include bounding dirs in EImportStart
     // (now that the interior metadata is already in the event)
     le->metablob.add_dir(bd, false);
@@ -1620,6 +1640,9 @@ void Migrator::import_notify_abort(CDir *dir)
 {
   dout(7) << "import_notify_abort " << *dir << endl;
   
+  set<CDir*> import_bounds;
+  cache->get_subtree_bounds(dir, import_bounds);
+
   for (set<int>::iterator p = import_bystanders[dir].begin();
        p != import_bystanders[dir].end();
        ++p) {
@@ -1629,7 +1652,7 @@ void Migrator::import_notify_abort(CDir *dir)
       new MExportDirNotify(dir->dirfrag(), true,
                           pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN),
                           pair<int,int>(import_peer[dir->dirfrag()], CDIR_AUTH_UNKNOWN));
-    notify->copy_bounds(import_bounds[dir]);
+    notify->copy_bounds(import_bounds);
     mds->send_message_mds(notify, *p, MDS_PORT_MIGRATOR);
   }
 }
@@ -1656,8 +1679,10 @@ void Migrator::import_reverse_unpin(CDir *dir)
   dir->state_clear(CDir::STATE_IMPORTING);
 
   // remove bound pins
-  for (set<CDir*>::iterator it = import_bounds[dir].begin();
-       it != import_bounds[dir].end();
+  set<CDir*> bounds;
+  cache->get_subtree_bounds(dir, bounds);
+  for (set<CDir*>::iterator it = bounds.begin();
+       it != bounds.end();
        it++) {
     CDir *bd = *it;
     bd->put(CDir::PIN_IMPORTBOUND);
@@ -1667,8 +1692,6 @@ void Migrator::import_reverse_unpin(CDir *dir)
   // clean up
   import_state.erase(dir->dirfrag());
   import_peer.erase(dir->dirfrag());
-  import_bound_inos.erase(dir->dirfrag());
-  import_bounds.erase(dir);
   import_bystanders.erase(dir);
 
   cache->show_subtrees();
@@ -1712,8 +1735,10 @@ void Migrator::import_finish(CDir *dir, bool now)
   dir->put(CDir::PIN_IMPORTING);
   dir->state_clear(CDir::STATE_IMPORTING);
 
-  for (set<CDir*>::iterator it = import_bounds[dir].begin();
-       it != import_bounds[dir].end();
+  set<CDir*> bounds;
+  cache->get_subtree_bounds(dir, bounds);
+  for (set<CDir*>::iterator it = bounds.begin();
+       it != bounds.end();
        it++) {
     CDir *bd = *it;
 
@@ -1726,15 +1751,12 @@ void Migrator::import_finish(CDir *dir, bool now)
   dir->unfreeze_tree();
 
   // adjust auth, with possible subtree merge.
-  cache->verify_subtree_bounds(dir, import_bounds[dir]);
   cache->adjust_subtree_auth(dir, mds->get_nodeid());
   cache->try_subtree_merge(dir);
   
   // clear import state (we're done!)
   import_state.erase(dir->dirfrag());
   import_peer.erase(dir->dirfrag());
-  import_bound_inos.erase(dir->dirfrag());
-  import_bounds.erase(dir);
   import_bystanders.erase(dir);
 
   // process delayed expires
index 69a5907723424c52c41eeaa8a71c9a0a915a0297..2a537738998d993cef33a2a8b1d190ef513b8695 100644 (file)
@@ -81,7 +81,7 @@ protected:
   // export fun
   map<CDir*,int>               export_state;
   map<CDir*,int>               export_peer;
-  map<CDir*,set<CDir*> >       export_bounds;
+  //map<CDir*,set<CDir*> >       export_bounds;
   map<CDir*,list<bufferlist> > export_data;   // only during EXPORTING state
   map<CDir*,set<int> >         export_warning_ack_waiting;
   map<CDir*,set<int> >         export_notify_ack_waiting;
@@ -115,8 +115,6 @@ public:
 protected:
   map<dirfrag_t,int>              import_state;  // FIXME make these dirfrags
   map<dirfrag_t,int>              import_peer;
-  map<dirfrag_t,list<dirfrag_t> > import_bound_inos;
-  map<CDir*,set<CDir*> >          import_bounds;
   map<CDir*,set<int> >            import_bystanders;
 
 
@@ -149,14 +147,6 @@ public:
     return 0;
   }
   bool is_importing() { return !import_state.empty(); }
-  const list<dirfrag_t>& get_import_bound_inos(dirfrag_t base) { 
-    assert(import_bound_inos.count(base));
-    return import_bound_inos[base];
-  }
-  const set<CDir*>& get_import_bounds(CDir *base) { 
-    assert(import_bounds.count(base));
-    return import_bounds[base];
-  }
 
   int get_import_state(dirfrag_t df) {
     assert(import_state.count(df));