]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
basic mds recovery now working, mostly missing graceful handling of surviving mds...
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Sun, 4 Feb 2007 03:26:56 +0000 (03:26 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Sun, 4 Feb 2007 03:26:56 +0000 (03:26 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1075 29311d96-e01e-0410-9327-a35deaab8ce9

20 files changed:
branches/sage/cephmds2/TODO
branches/sage/cephmds2/client/SyntheticClient.cc
branches/sage/cephmds2/include/buffer.h
branches/sage/cephmds2/mds/CDentry.h
branches/sage/cephmds2/mds/CDir.cc
branches/sage/cephmds2/mds/CDir.h
branches/sage/cephmds2/mds/Lock.h
branches/sage/cephmds2/mds/MDCache.cc
branches/sage/cephmds2/mds/MDCache.h
branches/sage/cephmds2/mds/MDS.cc
branches/sage/cephmds2/mds/MDS.h
branches/sage/cephmds2/mds/MDSMap.h
branches/sage/cephmds2/mds/Migrator.cc
branches/sage/cephmds2/mds/Migrator.h
branches/sage/cephmds2/messages/MMDSCacheRejoin.h
branches/sage/cephmds2/messages/MMDSCacheRejoinAck.h [new file with mode: 0644]
branches/sage/cephmds2/messages/MMDSImportMap.h
branches/sage/cephmds2/mon/MDSMonitor.cc
branches/sage/cephmds2/msg/Message.cc
branches/sage/cephmds2/msg/Message.h

index 4031e9a0ee2b186131740065d4a933d2b0d5d780..1144efdf58b7e8b773c4501b28fef73d147194e2 100644 (file)
@@ -1,10 +1,23 @@
-- preemptive mdsmap sharing
 
-- how to disambiguate importstart w/o an importfinish?
-- der chicken/egg headache on immediately sending import_map, since imports are ambiguous.
+huh:
+- how to keep mds osd op ids unique after a failover?
+  - (and, how to flush out failed mds)
+
+
+- handle exporter recovery if importer fails during EXPORT_EXPORTING stage
+
+- delay response to sending import_map if export in progress?
+- finish export before sending import_map?  (if is_recoverying or whatever...)
+
 - ambiguous imports on active node should include in-progress imports!
 
+- how to effectively trim cache after resolve but before rejoin
+  - we need to eliminate unneed non-auth metadata, without hosing potentially useful auth metadata
+
+
+- falures during recovery stages... rejoin
+
+
 
 importmap only sent after exports have completed.
 failures update export ack waitlists, so exports will compelte if unrelated nodes fail.
@@ -13,7 +26,7 @@ failure of exporter induces some cleanup on importer.  will disambiguate when it
 failure of importer induces cleanup on exporter.  no ambiguity.
 
 
-- no new mds may join if cluster is in a recovery state.  creating|replay -> standby (unless failed)
+- no new mds may join if cluster is in a recovery state.  starting -> standby (unless failed)
 
 recoverer:
 - enter rejoin state
index b0569d52e553e554246a81526a073414df012637..1f8966e127b9123d493305d1790b62191f8970e4 100644 (file)
@@ -105,7 +105,7 @@ void parse_syn_options(vector<char*>& args)
         syn_iargs.push_back( atoi(args[++i]) );
         syn_iargs.push_back( atoi(args[++i]) );
 
-      } else if (strcmp(args[i],"fullwalk") == 0) {
+      } else if (strcmp(args[i],"walk") == 0) {
         syn_modes.push_back( SYNCLIENT_MODE_FULLWALK );
         //syn_sargs.push_back( atoi(args[++i]) );
       } else if (strcmp(args[i],"randomwalk") == 0) {
@@ -366,7 +366,7 @@ int SyntheticClient::run()
 
     case SYNCLIENT_MODE_FULLWALK:
       {
-        string sarg1 = get_sarg(0);
+        string sarg1;// = get_sarg(0);
         if (run_me()) {
           dout(2) << "fullwalk" << sarg1 << endl;
           full_walk(sarg1);
@@ -721,27 +721,39 @@ int SyntheticClient::full_walk(string& basedir)
 {
   if (time_to_stop()) return -1;
 
-  // read dir
-  map<string, inode_t> contents;
-  int r = client->getdir(basedir.c_str(), contents);
-  if (r < 0) {
-    dout(1) << "readdir on " << basedir << " returns " << r << endl;
-    return r;
-  }
+  list<string> dirq;
+  dirq.push_back(basedir);
 
-  for (map<string, inode_t>::iterator it = contents.begin();
-       it != contents.end();
-       it++) {
-    string file = basedir + "/" + it->first;
+  while (!dirq.empty()) {
+    string dir = dirq.front();
+    dirq.pop_front();
 
-    struct stat st;
-    int r = client->lstat(file.c_str(), &st);
+    // read dir
+    map<string, inode_t> contents;
+    int r = client->getdir(dir.c_str(), contents);
     if (r < 0) {
-      dout(1) << "stat error on " << file << " r=" << r << endl;
+      dout(1) << "readdir on " << dir << " returns " << r << endl;
       continue;
     }
-
-    if ((st.st_mode & INODE_TYPE_MASK) == INODE_MODE_DIR) full_walk(file);
+    
+    for (map<string, inode_t>::iterator it = contents.begin();
+        it != contents.end();
+        it++) {
+      if (it->first == ".") continue;
+      if (it->first == "..") continue;
+      string file = dir + "/" + it->first;
+      
+      struct stat st;
+      int r = client->lstat(file.c_str(), &st);
+      if (r < 0) {
+       dout(1) << "stat error on " << file << " r=" << r << endl;
+       continue;
+      }
+      
+      if ((st.st_mode & INODE_TYPE_MASK) == INODE_MODE_DIR) {
+       dirq.push_back(file);
+      }
+    }
   }
 
   return 0;
index 06bad7443f1c2ebd4e0531e7eb6702fe6a15c724..9fa9663ff1fd32c9f255a2d18e94d917d15e4007 100644 (file)
@@ -799,6 +799,34 @@ inline void _decode(bufferlist& s, bufferlist& bl, int& off)
 #include <vector>
 #include <string>
 
+// set<string>
+inline void _encode(const std::set<std::string>& s, bufferlist& bl)
+{
+  int n = s.size();
+  bl.append((char*)&n, sizeof(n));
+  for (std::set<std::string>::const_iterator it = s.begin();
+       it != s.end();
+       it++) {
+    ::_encode(*it, bl);
+    n--;
+  }
+  assert(n==0);
+}
+inline void _decode(std::set<std::string>& s, bufferlist& bl, int& off) 
+{
+  s.clear();
+  int n;
+  bl.copy(off, sizeof(n), (char*)&n);
+  off += sizeof(n);
+  for (int i=0; i<n; i++) {
+    std::string v;
+    ::_decode(v, bl, off);
+    s.insert(v);
+  }
+  assert(s.size() == (unsigned)n);
+}
+
+
 // set<T>
 template<class T>
 inline void _encode(const std::set<T>& s, bufferlist& bl)
index 3080a0b0c201a1b37b15539e1966dfb7ebe7b83f..65b9155ce69f959d83973d54e8843bc38844e008 100644 (file)
@@ -32,7 +32,7 @@ class CDir;
 #define DN_LOCK_SYNC      0
 #define DN_LOCK_PREXLOCK  1
 #define DN_LOCK_XLOCK     2
-#define DN_LOCK_UNPINNING 3  // waiting for pins to go away
+#define DN_LOCK_UNPINNING 3  // waiting for pins to go away .. FIXME REVIEW THIS CODE ..
 
 #define DN_XLOCK_FOREIGN  ((Message*)0x1)  // not 0, not a valid pointer.
 
index 8f5137514e00f7179bb0666dd40039e6c0914f26..578cb0c091bb88f4706e0f80ccfc9d2c99a6395d 100644 (file)
@@ -60,6 +60,13 @@ ostream& operator<<(ostream& out, CDir& dir)
     out << " dir_auth=" << dir.get_dir_auth();
   
   out << " state=" << dir.get_state();
+  if (dir.state_test(CDIR_STATE_COMPLETE)) out << "|complete";
+  if (dir.state_test(CDIR_STATE_FREEZINGTREE)) out << "|freezingtree";
+  if (dir.state_test(CDIR_STATE_FROZENTREE)) out << "|frozentree";
+  if (dir.state_test(CDIR_STATE_FROZENTREELEAF)) out << "|frozentreeleaf";
+  if (dir.state_test(CDIR_STATE_FROZENDIR)) out << "|frozendir";
+  if (dir.state_test(CDIR_STATE_FREEZINGDIR)) out << "|freezingdir";
+
   out << " sz=" << dir.get_nitems() << "+" << dir.get_nnull();
   
   if (dir.get_num_ref()) {
@@ -716,13 +723,24 @@ void CDir::freeze_tree_finish(Context *c)
 void CDir::unfreeze_tree()
 {
   dout(10) << "unfreeze_tree " << *this << endl;
-  state_clear(CDIR_STATE_FROZENTREE);
-  
-  // unpin  (may => FREEZEABLE)   FIXME: is this order good?
-  inode->auth_unpin();
 
-  // waiters?
-  finish_waiting(CDIR_WAIT_UNFREEZE);
+  if (state_test(CDIR_STATE_FROZENTREE)) {
+    // frozen.  unfreeze.
+    state_clear(CDIR_STATE_FROZENTREE);
+
+    // unpin  (may => FREEZEABLE)   FIXME: is this order good?
+    inode->auth_unpin();
+
+    // waiters?
+    finish_waiting(CDIR_WAIT_UNFREEZE);
+  } else {
+    // freezing.  stop it.
+    assert(state_test(CDIR_STATE_FREEZINGTREE));
+    state_clear(CDIR_STATE_FREEZINGTREE);
+    
+    // cancel freeze waiters
+    finish_waiting(CDIR_WAIT_FREEZEABLE, -1);
+  }
 }
 
 bool CDir::is_freezing_tree()
index fa21c9cdcf3ef2559b66d327b92902a1e7e5c982..48df8c6f411b71ba09a71828c482d065a12a8eb6 100644 (file)
@@ -88,7 +88,8 @@ class Context;
                                     |CDIR_STATE_DIRTY)  
 #define CDIR_MASK_STATE_IMPORT_KEPT (CDIR_STATE_IMPORT\
                                     |CDIR_STATE_EXPORT\
-                                    |CDIR_STATE_IMPORTINGEXPORT)
+                                    |CDIR_STATE_IMPORTINGEXPORT\
+                                    |CDIR_STATE_FROZENTREE)
 #define CDIR_MASK_STATE_EXPORT_KEPT (CDIR_STATE_HASHED\
                                     |CDIR_STATE_FROZENTREE\
                                     |CDIR_STATE_FROZENDIR\
index 76fb9f5484d4901dc9dc5e2fd83c995817dbeaba..0d9dabb61b6690a0238b39290ff321e46e6c40f8 100644 (file)
@@ -85,7 +85,7 @@ class CLock {
   
  public:
   CLock() : 
-    state(LOCK_LOCK), 
+    state(LOCK_SYNC), 
     nread(0), 
     wrlock_by(0) {
   }
index 491b353326b0f849bf77993d46cc9ba16d59c300..6e351d3f9a4f479c7b21483b3dacb2df7dcc0f6e 100644 (file)
@@ -47,6 +47,7 @@
 
 #include "messages/MMDSImportMap.h"
 #include "messages/MMDSCacheRejoin.h"
+#include "messages/MMDSCacheRejoinAck.h"
 
 #include "messages/MDiscover.h"
 #include "messages/MDiscoverReply.h"
@@ -249,18 +250,6 @@ void MDCache::log_import_map(Context *onsync)
 // =====================
 // recovery stuff
 
-void MDCache::send_import_maps()
-{
-  dout(10) << "send_import_maps" << endl;
-  
-  for (set<int>::iterator p = want_import_map.begin();
-       p != want_import_map.end();
-       ++p)
-    send_import_map(*p);
-
-  want_import_map.clear();
-}
-
 void MDCache::send_import_map(int who)
 {
   dout(10) << "send_import_map to mds" << who << endl;
@@ -272,14 +261,22 @@ void MDCache::send_import_map(int who)
        p != imports.end();
        p++) {
     CDir *im = *p;
-    m->add_import(im->ino());
-    
-    if (nested_exports.count(im)) {
-      for (set<CDir*>::iterator q = nested_exports[im].begin();
-          q != nested_exports[im].end();
-          ++q) {
-       CDir *ex = *q;
-       m->add_import_export(im->ino(), ex->ino());
+
+    if (migrator->is_importing(im->ino())) {
+      // ambiguous (mid-import)
+      m->add_ambiguous_import(im->ino(), 
+                             migrator->get_import_bounds(im->ino()));
+    } else {
+      // not ambiguous.
+      m->add_import(im->ino());
+      
+      if (nested_exports.count(im)) {
+       for (set<CDir*>::iterator q = nested_exports[im].begin();
+            q != nested_exports[im].end();
+            ++q) {
+         CDir *ex = *q;
+         m->add_import_export(im->ino(), ex->ino());
+       }
       }
     }
   }
@@ -295,15 +292,18 @@ void MDCache::send_import_map(int who)
 }
 
 
+
+/*
+ * during resolve state, we share import_maps to determine who
+ * is authoritative for which trees.  we expect to get an import_map
+ * from _everyone_ in the recovery_set (the mds cluster at the time of
+ * the first failure).
+ */
 void MDCache::handle_import_map(MMDSImportMap *m)
 {
   dout(7) << "handle_import_map from " << m->get_source() << endl;
   int from = m->get_source().num();
 
-  MMDSCacheRejoin *rejoin;
-  if (mds->is_active() || mds->is_stopping())
-    rejoin = new MMDSCacheRejoin;
-
   // FIXME: check if we are a surviving ambiguous importer
 
   // update my dir_auth values
@@ -328,11 +328,6 @@ void MDCache::handle_import_map(MMDSImportMap *m)
       if (ex->get_dir_auth() == CDIR_AUTH_PARENT)
        ex->set_dir_auth(CDIR_AUTH_UNKNOWN);
     }
-
-    if (mds->is_active()) {
-      // walk my cache to fill out CacheRejoin
-      cache_rejoin_walk(im, rejoin);
-    }
   }
 
   // note ambiguous imports too
@@ -340,22 +335,23 @@ void MDCache::handle_import_map(MMDSImportMap *m)
        pi != m->ambiguous_imap.end();
        ++pi)
     mds->mdcache->other_ambiguous_imports[from][pi->first].swap( pi->second );
-  
-  if (mds->is_rejoin()) {
-    assert(want_import_map.count(from));
-    want_import_map.erase(from);
-    if (want_import_map.empty()) {
-      dout(10) << "got all import maps" << endl;
-      disambiguate_imports();
-      recalc_auth_bits();
-      send_cache_rejoins();
-    } else {
-      dout(10) << "still waiting for importmaps from " << want_import_map << endl;
-    }
-  } else if (mds->is_active() || mds->is_stopping()) {
-    mds->send_message_mds(rejoin, from, MDS_PORT_CACHE);
-  } 
 
+  // did i get them all?
+  got_import_map.insert(from);
+  
+  if (got_import_map == recovery_set) {
+    dout(10) << "got all import maps, ready to rejoin" << endl;
+    disambiguate_imports();
+    recalc_auth_bits();
+    
+    // move to rejoin state
+    mds->set_want_state(MDSMap::STATE_REJOIN);
+  
+  } else {
+    dout(10) << "still waiting for more importmaps, got " << got_import_map 
+            << ", need " << recovery_set << endl;
+  }
+  
   delete m;
 }
 
@@ -569,12 +565,66 @@ void MDCache::finish_ambiguous_export(inodeno_t dirino, set<inodeno_t>& bounds)
 
 
 
+/*
+ * rejoin phase!
+ * we start out by sending rejoins to everyone in the recovery set.
+ *
+ * if _were_ are rejoining, send for all regions in our cache.
+ * if we are active|stopping, send only to nodes that are are rejoining.
+ */
 void MDCache::send_cache_rejoins()
 {
   dout(10) << "send_cache_rejoins " << endl;
+
+  map<int, MMDSCacheRejoin*> rejoins;
   
+  // build list of dir_auth regions
+  list<CDir*> dir_auth_regions;
+  for (hash_map<inodeno_t,CInode*>::iterator p = inode_map.begin();
+       p != inode_map.end();
+       ++p) {
+    if (!p->second->is_dir()) continue;
+    if (!p->second->dir) continue;
+    if (p->second->dir->get_dir_auth() == CDIR_AUTH_PARENT) continue;
+
+    int auth = p->second->dir->get_dir_auth();
+    assert(auth >= 0);
+
+    if (auth == mds->get_nodeid()) continue; // skip my own regions!
+
+    if (rejoins.count(auth) == 0) {
+      if (mds->is_rejoin() ||                // if i am rejoining,
+         mds->mdsmap->is_rejoin(auth))      // or if they are rejoining,
+       rejoins[auth] = new MMDSCacheRejoin; // send a rejoin
+      else
+       continue;   // otherwise, skip this region
+    }
+    
+    // add to list
+    dout(10) << " on mds" << auth << " region " << *p->second << endl;
+    dir_auth_regions.push_back(p->second->dir);
+  }
+
+  // walk the regions
+  for (list<CDir*>::iterator p = dir_auth_regions.begin();
+       p != dir_auth_regions.end();
+       ++p) {
+    CDir *dir = *p;
+    int to = dir->authority();
+    cache_rejoin_walk(dir, rejoins[to]);
+  }
+
+  // send the messages
+  assert(rejoin_ack_gather.empty());
+  for (map<int,MMDSCacheRejoin*>::iterator p = rejoins.begin();
+       p != rejoins.end();
+       ++p) {
+    mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE);
+    rejoin_ack_gather.insert(p->first);
+  }
 }
 
+
 void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin)
 {
   dout(10) << "cache_rejoin_walk " << *dir << endl;
@@ -587,22 +637,13 @@ void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin)
        p != dir->items.end();
        ++p) {
     // dentry
-    if (mds->is_rejoin())
-      rejoin->add_dentry(dir->ino(), p->first, -1);
-    else
-      rejoin->add_dentry(dir->ino(), p->first, p->second->lockstate);
-
+    rejoin->add_dentry(dir->ino(), p->first);
+    
     // inode?
     if (p->second->is_primary() && p->second->get_inode()) {
       CInode *in = p->second->get_inode();
-      if (mds->is_rejoin())
-       rejoin->add_inode(in->ino(), 
-                         -1, -1,
-                         in->get_caps_wanted());
-      else
-       rejoin->add_inode(in->ino(), 
-                         in->hardlock.get_state(), in->filelock.get_state(),
-                         in->get_caps_wanted());
+      rejoin->add_inode(in->ino(), 
+                       in->get_caps_wanted());
       
       // dir?
       if (in->dir &&
@@ -619,20 +660,169 @@ void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin)
 }
 
 
+/*
+ * i got a rejoin.
+ * 
+ *  - reply with the lockstate
+ *
+ * if i am active|stopping, 
+ *  - remove source from replica list for everything not referenced here.
+ */
 void MDCache::handle_cache_rejoin(MMDSCacheRejoin *m)
 {
   dout(7) << "handle_cache_rejoin from " << m->get_source() << endl;
-  //int from = m->get_source().num();
+  int from = m->get_source().num();
+
+  MMDSCacheRejoinAck *ack = new MMDSCacheRejoinAck;
+
+  if (mds->is_active() || mds->is_stopping()) {
+    dout(10) << "removing stale cache replicas" << endl;
+    // first, scour cache of replica references
+    for (hash_map<inodeno_t,CInode*>::iterator p = inode_map.begin();
+        p != inode_map.end();
+        ++p) {
+      // inode
+      CInode *in = p->second;
+      if (in->is_replica(from) && m->inodes.count(p->first) == 0) {
+       inode_remove_replica(in, from);
+       dout(10) << " rem " << *in << endl;
+      }
+
+      // dentry
+      if (in->parent) {
+       CDentry *dn = in->parent;
+       if (dn->is_replica(from) &&
+           (m->dentries.count(dn->get_dir()->ino()) == 0 ||
+            m->dentries[dn->get_dir()->ino()].count(dn->get_name()) == 0)) {
+         dn->remove_replica(from);
+         dout(10) << " rem " << *dn << endl;
+       }
+      }
 
+      // dir
+      if (in->dir) {
+       CDir *dir = in->dir;
+       if (dir->is_replica(from) && m->dirs.count(p->first) == 0) {
+         dir->remove_replica(from);
+         dout(10) << " rem " << *dir << endl;
+       }
+      }
+    }
+  } else {
+    assert(mds->is_rejoin());
+  }
+
+  // dirs
+  for (set<inodeno_t>::iterator p = m->dirs.begin();
+       p != m->dirs.end();
+       ++p) {
+    CInode *diri = get_inode(*p);
+    assert(diri);
+    CDir *dir = diri->dir;
+    assert(dir);
+    int nonce = dir->add_replica(from);
+    dout(10) << " has " << *dir << endl;
+    ack->add_dir(*p, nonce);
+    
+    // dentries
+    for (set<string>::iterator q = m->dentries[*p].begin();
+        q != m->dentries[*p].end();
+        ++q) {
+      CDentry *dn = dir->lookup(*q);
+      assert(dn);
+      int nonce = dn->add_replica(from);
+      dout(10) << " has " << *dn << endl;
+      ack->add_dentry(*p, *q, dn->get_lockstate(), nonce);
+    }
+  }
+
+  // inodes
+  for (map<inodeno_t,int>::iterator p = m->inodes.begin();
+       p != m->inodes.end();
+       ++p) {
+    CInode *in = get_inode(p->first);
+    assert(in);
+    int nonce = in->add_replica(from);
+    if (p->second)
+      in->mds_caps_wanted[from] = p->second;
+    else
+      in->mds_caps_wanted.erase(from);
+    in->hardlock.gather_set.erase(from);  // just in case
+    in->filelock.gather_set.erase(from);  // just in case
+    dout(10) << " has " << *in << endl;
+    ack->add_inode(p->first, 
+                  in->hardlock.get_replica_state(), in->filelock.get_replica_state(), 
+                  nonce);
+  }
+
+  // send ack
+  mds->send_message_mds(ack, from, MDS_PORT_CACHE);
   
+  delete m;
+}
+
+
+void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoinAck *m)
+{
+  dout(7) << "handle_cache_rejoin from " << m->get_source() << endl;
+  int from = m->get_source().num();
+  
+  // dirs
+  for (list<MMDSCacheRejoinAck::dirinfo>::iterator p = m->dirs.begin();
+       p != m->dirs.end();
+       ++p) {
+    CInode *diri = get_inode(p->dirino);
+    CDir *dir = diri->dir;
+    assert(dir);
 
+    dir->set_replica_nonce(p->nonce);
+    dout(10) << " got " << *dir << endl;
+
+    // dentries
+    for (map<string,MMDSCacheRejoinAck::dninfo>::iterator q = m->dentries[p->dirino].begin();
+        q != m->dentries[p->dirino].end();
+        ++q) {
+      CDentry *dn = dir->lookup(q->first);
+      assert(dn);
+      dn->set_replica_nonce(q->second.nonce);
+      dn->set_lockstate(q->second.lock);
+      dout(10) << " got " << *dn << endl;
+    }
+  }
+
+  // inodes
+  for (list<MMDSCacheRejoinAck::inodeinfo>::iterator p = m->inodes.begin();
+       p != m->inodes.end();
+       ++p) {
+    CInode *in = get_inode(p->ino);
+    assert(in);
+    in->set_replica_nonce(p->nonce);
+    in->hardlock.set_state(p->hardlock);
+    in->filelock.set_state(p->filelock);
+    dout(10) << " got " << *in << endl;
+  }
 
   delete m;
+
+  // done?
+  rejoin_ack_gather.erase(from);
+  if (rejoin_ack_gather.empty()) {
+    dout(7) << "all done, going active!" << endl;
+    show_imports();
+    show_cache();
+    mds->set_want_state(MDSMap::STATE_ACTIVE);
+  } else {
+    dout(7) << "still need rejoin_ack from " << rejoin_ack_gather << endl;
+  }
+
 }
 
 
 
 
+
+// ===============================================================================
+
 void MDCache::rename_file(CDentry *srcdn, 
                           CDentry *destdn)
 {
@@ -1209,6 +1399,14 @@ void MDCache::dispatch(Message *m)
     handle_import_map((MMDSImportMap*)m);
     break;
 
+  case MSG_MDS_CACHEREJOIN:
+    handle_cache_rejoin((MMDSCacheRejoin*)m);
+    break;
+  case MSG_MDS_CACHEREJOINACK:
+    handle_cache_rejoin_ack((MMDSCacheRejoinAck*)m);
+    break;
+
+
   case MSG_MDS_DISCOVER:
     handle_discover((MDiscover*)m);
     break;
@@ -2554,27 +2752,7 @@ void MDCache::handle_cache_expire(MCacheExpire *m)
     else if (nonce == in->get_replica_nonce(from)) {
       // remove from our cached_by
       dout(7) << "inode expire on " << *in << " from mds" << from << " cached_by was " << in->get_replicas() << endl;
-      in->remove_replica(from);
-      in->mds_caps_wanted.erase(from);
-      
-      // note: this code calls _eval more often than it needs to!
-      // fix lock
-      if (in->hardlock.is_gathering(from)) {
-        in->hardlock.gather_set.erase(from);
-        if (in->hardlock.gather_set.size() == 0)
-          mds->locker->inode_hard_eval(in);
-      }
-      if (in->filelock.is_gathering(from)) {
-        in->filelock.gather_set.erase(from);
-        if (in->filelock.gather_set.size() == 0)
-          mds->locker->inode_file_eval(in);
-      }
-      
-      // alone now?
-      if (!in->is_replicated()) {
-        mds->locker->inode_hard_eval(in);
-        mds->locker->inode_file_eval(in);
-      }
+      inode_remove_replica(in, from);
 
     } 
     else {
@@ -2693,6 +2871,30 @@ void MDCache::handle_cache_expire(MCacheExpire *m)
   delete m;
 }
 
+void MDCache::inode_remove_replica(CInode *in, int from)
+{
+  in->remove_replica(from);
+  in->mds_caps_wanted.erase(from);
+  
+  // note: this code calls _eval more often than it needs to!
+  // fix lock
+  if (in->hardlock.is_gathering(from)) {
+    in->hardlock.gather_set.erase(from);
+    if (in->hardlock.gather_set.size() == 0)
+      mds->locker->inode_hard_eval(in);
+  }
+  if (in->filelock.is_gathering(from)) {
+    in->filelock.gather_set.erase(from);
+    if (in->filelock.gather_set.size() == 0)
+      mds->locker->inode_file_eval(in);
+  }
+  
+  // alone now?
+  if (!in->is_replicated()) {
+    mds->locker->inode_hard_eval(in);
+    mds->locker->inode_file_eval(in);
+  }
+}
 
 
 int MDCache::send_dir_updates(CDir *dir, bool bcast)
index 7715f9e50d963445d3c01c96b722e33b3f7ec066..c388bc8f921c1299b55efb2fb46cacf16338c633 100644 (file)
@@ -131,22 +131,23 @@ protected:
   // from MMDSImportMaps
   map<int, map<inodeno_t, set<inodeno_t> > > other_ambiguous_imports;  
 
-  set<int> want_import_map;    // nodes i need to send my import map to (when exports finish)
-  set<int> import_map_gather;  // nodes i need an import_map from
+  set<int> recovery_set;
+  set<int> got_import_map;    // nodes i need to send my import map to (when exports finish)
+  set<int> rejoin_ack_gather;  // nodes i need a rejoin ack from
   
   void handle_import_map(MMDSImportMap *m);
   void handle_cache_rejoin(MMDSCacheRejoin *m);
   void handle_cache_rejoin_ack(MMDSCacheRejoinAck *m);
   void disambiguate_imports();
-  void send_cache_rejoins();
   void cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin);
   void send_cache_rejoin_acks();
 public:
   void send_import_map(int who);
   void send_import_maps();
+  void send_cache_rejoins();
 
   void set_recovery_set(set<int>& s) {
-    want_import_map.swap(s);
+    recovery_set = s;
   }
 
   // ambiguous imports
@@ -194,8 +195,7 @@ public:
 
   void log_import_map(Context *onsync=0);
 
-
-  
   // cache
   void set_cache_size(size_t max) { lru.lru_set_max(max); }
   size_t get_cache_size() { return lru.lru_get_size(); }
@@ -243,6 +243,9 @@ public:
     else
       lru.lru_midtouch(dn);
   }
+
+  void inode_remove_replica(CInode *in, int rep);
+
   void rename_file(CDentry *srcdn, CDentry *destdn);
 
  public:
index c8c67949b5e89b352c3c21637414d9dbd3a2fe98..7d0a2b874255cd81a84e826f7e5c545b7daa4572 100644 (file)
@@ -422,7 +422,7 @@ void MDS::handle_mds_map(MMDSMap *m)
 
   // is it new?
   if (epoch <= mdsmap->get_epoch()) {
-    dout(1) << " old map epoch " << epoch << " < " << mdsmap->get_epoch() 
+    dout(1) << " old map epoch " << epoch << " <= " << mdsmap->get_epoch() 
            << ", discarding" << endl;
     delete m;
     return;
@@ -431,8 +431,9 @@ void MDS::handle_mds_map(MMDSMap *m)
   // note some old state
   int oldwhoami = whoami;
   int oldstate = state;
-  set<int> oldrejoin;
-  mdsmap->get_mds_set(oldrejoin, MDSMap::STATE_REJOIN);
+  set<int> oldresolve;
+  mdsmap->get_mds_set(oldresolve, MDSMap::STATE_RESOLVE);
+  bool wasrejoining = mdsmap->is_rejoining();
   set<int> oldfailed;
   mdsmap->get_mds_set(oldfailed, MDSMap::STATE_FAILED);
 
@@ -512,19 +513,26 @@ void MDS::handle_mds_map(MMDSMap *m)
   }
   
   
-
-  // is anybody rejoining?
-  if (is_rejoin() || is_active() || is_stopping()) {
-    set<int> rejoin;
-    mdsmap->get_mds_set(rejoin, MDSMap::STATE_REJOIN);
-    dout(10) << "rejoin set is " << rejoin << ", was " << oldrejoin << endl;
-    for (set<int>::iterator p = rejoin.begin(); p != rejoin.end(); ++p) {
+  // is anyone resolving?
+  if (is_resolve() || is_rejoin() || is_active() || is_stopping()) {
+    set<int> resolve;
+    mdsmap->get_mds_set(resolve, MDSMap::STATE_RESOLVE);
+    if (oldresolve != resolve) 
+      dout(10) << "resolve set is " << resolve << ", was " << oldresolve << endl;
+    for (set<int>::iterator p = resolve.begin(); p != resolve.end(); ++p) {
       if (*p == whoami) continue;
-      if (oldrejoin.count(*p) == 0 ||          // if other guy newly rejoin, or
-         oldstate == MDSMap::STATE_REPLAY)    // if i'm newly rejoin,
+      if (oldresolve.count(*p) == 0 ||         // if other guy newly resolve, or
+         oldstate == MDSMap::STATE_REPLAY)    // if i'm newly resolve,
        mdcache->send_import_map(*p);          // share my import map
     }
   }
+  
+  // is everybody finally rejoining?
+  if (is_rejoin() || is_active() || is_stopping()) {
+    if (!wasrejoining && mdsmap->is_rejoining()) {
+      mdcache->send_cache_rejoins();
+    }
+  }
 
   // did anyone go down?
   if (is_active() || is_stopping()) {
@@ -655,7 +663,7 @@ void MDS::boot_finish()
     assert(mdlog->get_read_pos() == mdlog->get_write_pos());
   }
 
-  mark_active();
+  set_want_state(MDSMap::STATE_ACTIVE);
 }
 
 
@@ -705,26 +713,28 @@ void MDS::boot_replay(int step)
     
   case 6:
     // done with replay!
-    if (mdsmap->get_num_mds(MDSMap::STATE_REJOIN) == 0 &&
+    if (mdsmap->get_num_mds(MDSMap::STATE_ACTIVE) == 0 &&
+       mdsmap->get_num_mds(MDSMap::STATE_STOPPING) == 0 &&
+       mdsmap->get_num_mds(MDSMap::STATE_RESOLVE) == 0 &&
+       mdsmap->get_num_mds(MDSMap::STATE_REJOIN) == 0 &&
        mdsmap->get_num_mds(MDSMap::STATE_REPLAY) == 1 && // me
        mdsmap->get_num_mds(MDSMap::STATE_FAILED) == 0) {
       dout(2) << "boot_replay " << step << ": i am alone, moving to state active" << endl;      
-      want_state = MDSMap::STATE_ACTIVE;
+      set_want_state(MDSMap::STATE_ACTIVE);
     } else {
-      dout(2) << "boot_replay " << step << ": i am not alone, moving to state rejoin" << endl;
-      want_state = MDSMap::STATE_REJOIN;
+      dout(2) << "boot_replay " << step << ": i am not alone, moving to state resolve" << endl;
+      set_want_state(MDSMap::STATE_RESOLVE);
     }
-    beacon_send();
     break;
 
   }
 }
 
 
-void MDS::mark_active()
+void MDS::set_want_state(int s)
 {
-  dout(3) << "mark_active" << endl;
-  want_state = MDSMap::STATE_ACTIVE;
+  dout(3) << "set_want_state " << MDSMap::get_state_name(s) << endl;
+  want_state = s;
   beacon_send();
 }
 
@@ -750,8 +760,7 @@ int MDS::shutdown_start()
   }
 
   // go
-  want_state = MDSMap::STATE_STOPPING;
-  beacon_send();
+  set_want_state(MDSMap::STATE_STOPPING);
   return 0;
 }
 
@@ -761,8 +770,7 @@ void MDS::handle_shutdown_start(Message *m)
   dout(1) << " handle_shutdown_start" << endl;
 
   // set flag
-  want_state = MDSMap::STATE_STOPPING;
-  beacon_send();
+  set_want_state(MDSMap::STATE_STOPPING);
 
   delete m;
 }
@@ -774,8 +782,7 @@ int MDS::shutdown_final()
   dout(1) << "shutdown_final" << endl;
 
   // send final down:out beacon (it doesn't matter if this arrives)
-  want_state = MDSMap::STATE_OUT;
-  beacon_send();
+  set_want_state(MDSMap::STATE_OUT);
 
   // stop timers
   if (beacon_killer) {
@@ -817,6 +824,24 @@ void MDS::dispatch(Message *m)
 
 void MDS::my_dispatch(Message *m)
 {
+  // from bad mds?
+  if (m->get_source().is_mds()) {
+    int from = m->get_source().num();
+    if (!mdsmap->have_inst(from) ||
+       mdsmap->get_inst(from) != m->get_source_inst()) {
+      // bogus mds?
+      if (m->get_type() != MSG_MDS_MAP) {
+       dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source()
+               << ", dropping" << endl;
+       delete m;
+       return;
+      } else {
+       dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source()
+               << ", but it's an mdsmap, looking at it" << endl;
+      }
+    }
+  }
+
 
   switch (m->get_dest_port()) {
     
@@ -916,8 +941,7 @@ void MDS::my_dispatch(Message *m)
       dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to up:stopped" << endl;
 
       // tell monitor we shut down cleanly.
-      want_state = MDSMap::STATE_STOPPED;
-      beacon_send();
+      set_want_state(MDSMap::STATE_STOPPED);
     }
   }
 
@@ -957,6 +981,9 @@ void MDS::proc_message(Message *m)
   case MSG_PING:
     handle_ping((MPing*)m);
     return;
+
+  default:
+    assert(0);
   }
 
 }
index 85723572134d42dc33afce591d822d0d24010756..aed6f9b1dd3b2dcea011fef923c4ab37ec372670 100644 (file)
@@ -152,12 +152,13 @@ class MDS : public Dispatcher {
   bool is_starting() { return state == MDSMap::STATE_STARTING; }
   bool is_standby()  { return state == MDSMap::STATE_STANDBY; }
   bool is_replay()   { return state == MDSMap::STATE_REPLAY; }
+  bool is_resolve()  { return state == MDSMap::STATE_RESOLVE; }
   bool is_rejoin()   { return state == MDSMap::STATE_REJOIN; }
   bool is_active()   { return state == MDSMap::STATE_ACTIVE; }
   bool is_stopping() { return state == MDSMap::STATE_STOPPING; }
   bool is_stopped()  { return state == MDSMap::STATE_STOPPED; }
 
-  void mark_active();
+  void set_want_state(int s);
 
 
   // -- waiters --
index f2803142d2c092df642f1086759c843445feaa8c..21577475c9ac7f9d9e734114bf71f28f6b5ae260 100644 (file)
@@ -27,6 +27,7 @@ using namespace std;
 
 class MDSMap {
  public:
+  // mds states
   static const int STATE_DNE =      0;    // down, never existed.
   static const int STATE_OUT =      1;    // down, once existed, but no imports, empty log.
   static const int STATE_FAILED =   2;    // down, holds (er, held) metadata; needs to be recovered.
@@ -35,10 +36,11 @@ class MDSMap {
   static const int STATE_CREATING = 4;    // up, creating MDS instance (new journal, idalloc..)
   static const int STATE_STARTING = 5;    // up, starting prior out MDS instance.
   static const int STATE_REPLAY   = 6;    // up, scanning journal, recoverying any shared state
-  static const int STATE_REJOIN   = 7;    // up, replayed journal, rejoining distributed cache
-  static const int STATE_ACTIVE =   8;    // up, active
-  static const int STATE_STOPPING = 9;    // up, exporting metadata (-> standby or out)
-  static const int STATE_STOPPED  = 10;   // up, finished stopping.  like standby, but not avail to takeover.
+  static const int STATE_RESOLVE  = 7;    // up, disambiguating partial distributed operations (import/export, ...rename?)
+  static const int STATE_REJOIN   = 8;    // up, replayed journal, rejoining distributed cache
+  static const int STATE_ACTIVE =   9;    // up, active
+  static const int STATE_STOPPING = 10;    // up, exporting metadata (-> standby or out)
+  static const int STATE_STOPPED  = 11;   // up, finished stopping.  like standby, but not avail to takeover.
   
   static const char *get_state_name(int s) {
     switch (s) {
@@ -51,6 +53,7 @@ class MDSMap {
     case STATE_CREATING: return "up:creating";
     case STATE_STARTING: return "up:starting";
     case STATE_REPLAY:   return "up:replay";
+    case STATE_RESOLVE:  return "up:resolve";
     case STATE_REJOIN:   return "up:rejoin";
     case STATE_ACTIVE:   return "up:active";
     case STATE_STOPPING: return "up:stopping";
@@ -148,13 +151,13 @@ class MDSMap {
         p != mds_state.end();
         p++)
       if (is_failed(p->first) || 
-         is_replay(p->first) || is_rejoin(p->first) ||
+         is_replay(p->first) || is_resolve(p->first) || is_rejoin(p->first) ||
          is_active(p->first) || is_stopping(p->first))
        s.insert(p->first);
   }
 
 
-  // state
+  // mds states
   bool is_down(int m) { return is_dne(m) || is_out(m) || is_failed(m); }
   bool is_up(int m) { return !is_down(m); }
 
@@ -166,18 +169,33 @@ class MDSMap {
   bool is_creating(int m) { return mds_state.count(m) && mds_state[m] == STATE_CREATING; }
   bool is_starting(int m) { return mds_state.count(m) && mds_state[m] == STATE_STARTING; }
   bool is_replay(int m)    { return mds_state.count(m) && mds_state[m] == STATE_REPLAY; }
+  bool is_resolve(int m)   { return mds_state.count(m) && mds_state[m] == STATE_RESOLVE; }
   bool is_rejoin(int m)    { return mds_state.count(m) && mds_state[m] == STATE_REJOIN; }
   bool is_active(int m)   { return mds_state.count(m) && mds_state[m] == STATE_ACTIVE; }
   bool is_stopping(int m) { return mds_state.count(m) && mds_state[m] == STATE_STOPPING; }
   bool is_stopped(int m)  { return mds_state.count(m) && mds_state[m] == STATE_STOPPED; }
 
+  bool has_created(int m) { return mds_created.count(m); }
+
+  // cluster states
   bool is_degraded() {
-    return get_num_mds(STATE_REPLAY) + get_num_mds(STATE_REJOIN) + get_num_mds(STATE_FAILED);
+    return get_num_mds(STATE_REPLAY) + 
+      get_num_mds(STATE_RESOLVE) + 
+      get_num_mds(STATE_REJOIN) + 
+      get_num_mds(STATE_FAILED);
   }
-  bool is_created(int m) {
-    return mds_created.count(m);
+  /*bool is_resolving() {  // nodes are resolving distributed ops
+    return get_num_mds(STATE_RESOLVE);
+    }*/
+  bool is_rejoining() {  
+    // nodes are rejoining cache state
+    return get_num_mds(STATE_REJOIN) > 0 &&
+      get_num_mds(STATE_RESOLVE) == 0 &&
+      get_num_mds(STATE_REPLAY) == 0 &&
+      get_num_mds(STATE_FAILED) == 0;
   }
 
+
   int get_state(int m) {
     if (mds_state.count(m)) return mds_state[m];
     return STATE_OUT;
index 40d4e7db837aaeb8ca9a2b954de7c99136080565..7e4b0b19391bdd0454eafead011ef117778334c7 100644 (file)
@@ -210,8 +210,90 @@ void Migrator::export_empty_import(CDir *dir)
 
 
 
+
 // ==========================================================
-// IMPORT/EXPORT
+// mds failure handling
+
+void Migrator::handle_mds_failure(int who)
+{
+  dout(5) << "handle_mds_failure mds" << who << endl;
+
+  // check my exports
+  map<CDir*,int>::iterator p = export_state.begin();
+  while (p != export_state.end()) {
+    map<CDir*,int>::iterator next = p;
+    next++;
+    CDir *dir = p->first;
+    
+    if (export_peer[dir] == who) {
+      // the guy i'm exporting to failed.  
+      // clean up.
+      dout(10) << "cleaning up export state " << p->second << " of " << *dir << endl;
+      
+      switch (p->second) {
+      case EXPORT_DISCOVERING:
+       dout(10) << "state discovering : canceling freeze and removing auth_pin" << endl;
+       dir->unfreeze_tree();  // cancel the freeze
+       dir->auth_unpin();     // remove the auth_pin (that was holding up the freeze)
+       break;
+
+      case EXPORT_FREEZING:
+       dout(10) << "state freezing : canceling freeze" << endl;
+       dir->unfreeze_tree();  // cancel the freeze
+       break;
+
+      case EXPORT_LOGGINGSTART:
+      case EXPORT_PREPPING:
+       dout(10) << "state loggingstart|prepping : logging EExportFinish(false)" << endl;
+       mds->mdlog->submit_entry(new EExportFinish(dir,false));
+       // logger will unfreeze.
+       break;
+
+      case EXPORT_EXPORTING:
+       dout(10) << "state exporting : logging EExportFinish(false), reversing, and unfreezing" << endl;
+       mds->mdlog->submit_entry(new EExportFinish(dir,false));
+       reverse_export(dir);
+       dir->unfreeze_tree();
+       break;
+
+      case EXPORT_LOGGINGFINISH:
+       dout(10) << "state loggingfinish : doing nothing, we were successful." << endl;
+       break;
+
+      default:
+       assert(0);
+      }
+
+      export_state.erase(dir);
+      export_peer.erase(dir);
+      
+    } else {
+      // third party failed.  potential peripheral damage?
+      if (p->second == EXPORT_EXPORTING) {
+       // yeah, i'm waiting for acks, let's fake theirs.
+       if (export_notify_ack_waiting[dir].count(who)) {
+         dout(10) << "faking export_dir_notify_ack from mds" << who
+                  << " on " << *dir << " to mds" << export_peer[dir] 
+                  << endl;
+         export_notify_ack_waiting[dir].erase(who);
+         if (export_notify_ack_waiting[dir].empty())
+           export_dir_acked(dir);
+       }
+      }
+    }
+    
+    // next!
+    p = next;
+  }
+}
+
+
+
+
+
+
+// ==========================================================
+// EXPORT
 
 
 class C_MDC_ExportFreeze : public Context {
@@ -223,7 +305,8 @@ public:
   C_MDC_ExportFreeze(Migrator *m, CDir *e, int d) :
        mig(m), ex(e), dest(d) {}
   virtual void finish(int r) {
-    mig->export_dir_frozen(ex, dest);
+    if (r >= 0)
+      mig->export_dir_frozen(ex, dest);
   }
 };
 
@@ -266,8 +349,9 @@ void Migrator::export_dir(CDir *dir,
   }
 
   // ok, let's go.
-
-  exporting.insert(dir);
+  assert(export_state.count(dir) == 0);
+  export_state[dir] = EXPORT_DISCOVERING;
+  export_peer[dir] = dest;
 
   // send ExportDirDiscover (ask target)
   mds->send_message_mds(new MExportDirDiscover(dir->inode), dest, MDS_PORT_MIGRATOR);
@@ -294,6 +378,9 @@ void Migrator::handle_export_dir_discover_ack(MExportDirDiscoverAck *m)
   
   dout(7) << "export_dir_discover_ack from " << m->get_source()
          << " on " << *dir << ", releasing auth_pin" << endl;
+
+  export_state[dir] = EXPORT_FREEZING;
+
   dir->auth_unpin();   // unpin to allow freeze to complete
   
   delete m;  // done
@@ -318,6 +405,7 @@ void Migrator::export_dir_frozen(CDir *dir,
 {
   // subtree is now frozen!
   dout(7) << "export_dir_frozen on " << *dir << " to " << dest << endl;
+  export_state[dir] = EXPORT_LOGGINGSTART;
 
   show_imports();
 
@@ -396,6 +484,17 @@ void Migrator::export_dir_frozen(CDir *dir,
 void Migrator::export_dir_frozen_logged(CDir *dir, MExportDirPrep *prep, int dest)
 {
   dout(7) << "export_dir_frozen_logged " << *dir << endl;
+
+  if (export_state.count(dir) == 0 ||
+      export_state[dir] != EXPORT_LOGGINGSTART) {
+    // export must have aborted.  
+    dout(7) << "export must have aborted, unfreezing and deleting me old prep message" << endl;
+    delete prep;
+    dir->unfreeze_tree();  // cancel the freeze
+    return;
+  }
+
+  export_state[dir] = EXPORT_PREPPING;
   mds->send_message_mds(prep, dest, MDS_PORT_MIGRATOR);
 }
 
@@ -408,7 +507,16 @@ void Migrator::handle_export_dir_prep_ack(MExportDirPrepAck *m)
 
   dout(7) << "export_dir_prep_ack " << *dir << ", starting export" << endl;
   
+  if (export_state.count(dir) == 0 ||
+      export_state[dir] != EXPORT_PREPPING) {
+    // export must have aborted.  
+    dout(7) << "export must have aborted, unfreezing" << endl;
+    dir->unfreeze_tree();
+    return;
+  }
+
   // start export.
+  export_state[dir] = EXPORT_EXPORTING;
   export_dir_go(dir, m->get_source().num());
 
   // done
@@ -431,6 +539,8 @@ void Migrator::export_dir_go(CDir *dir,
   // update imports/exports
   CDir *containing_import = cache->get_auth_container(dir);
 
+  set<CDir*> bounds;
+
   if (containing_import == dir) {
     dout(7) << " i'm rexporting a previous import" << endl;
     assert(dir->is_import());
@@ -446,6 +556,7 @@ void Migrator::export_dir_go(CDir *dir,
 
       // add to export message
       req->add_export(nested);
+      bounds.insert(nested);
       
       // nested beneath our new export *in; remove!
       dout(7) << " export " << *nested << " was nested beneath us; removing from export list(s)" << endl;
@@ -482,6 +593,7 @@ void Migrator::export_dir_go(CDir *dir,
 
         // add to msg
         req->add_export(nested);
+       bounds.insert(nested);
       } else {
         dout(12) << " export " << *nested << " is under other export " << *containing_export << ", which is unrelated" << endl;
         assert(cache->get_auth_container(containing_export) != containing_import);
@@ -489,12 +601,17 @@ void Migrator::export_dir_go(CDir *dir,
     }
   }
 
+
   // note new authority (locally)
   if (dir->inode->authority() == dest)
     dir->set_dir_auth( CDIR_AUTH_PARENT );
   else
     dir->set_dir_auth( dest );
 
+  // note bounds
+  export_bounds[dir].swap(bounds);
+
+
   // make list of nodes i expect an export_dir_notify_ack from
   //  (everyone w/ this dir open, but me!)
   assert(export_notify_ack_waiting[dir].empty());
@@ -513,7 +630,7 @@ void Migrator::export_dir_go(CDir *dir,
   assert(export_notify_ack_waiting[dir].count( dest ));
 
   // fill export message with cache data
-  C_Contexts *fin = new C_Contexts;
+  C_Contexts *fin = new C_Contexts;       // collect all the waiters
   int num_exported_inodes = export_dir_walk( req, 
                                              fin, 
                                              dir,   // base
@@ -776,41 +893,136 @@ void Migrator::handle_export_dir_notify_ack(MExportDirNotifyAck *m)
   assert(export_notify_ack_waiting[dir].count(from));
   export_notify_ack_waiting[dir].erase(from);
 
+  dout(7) << "handle_export_dir_notify_ack on " << *dir << " from " << from 
+         << ", still need (" << export_notify_ack_waiting[dir] << ")" << endl;
+  
   // done?
   if (export_notify_ack_waiting[dir].empty()) {
-    export_notify_ack_waiting.erase(dir);
-
+    export_dir_acked(dir);
+  } else {
     dout(7) << "handle_export_dir_notify_ack on " << *dir << " from " << from 
-            << ", last one!" << endl;
+            << ", still waiting for " << export_notify_ack_waiting[dir] << endl;
+  }
+  
+  delete m;
+}
+
 
-    // log export completion, then finish (unfreeze, trigger finish context, etc.)
-    mds->mdlog->submit_entry(new EExportFinish(dir, true),
-                            new C_MDS_ExportFinishLogged(this, dir));
 
+/*
+ * this happens if hte dest failes after i send teh export data but before it is acked
+ * that is, we don't know they safely received and logged it, so we reverse our changes
+ * and go on.
+ */
+void Migrator::reverse_export(CDir *dir)
+{
+  dout(7) << "reverse_export " << *dir << endl;
+  
+  assert(export_state[dir] == EXPORT_EXPORTING);
+  assert(export_bounds.count(dir));
+  
+  set<CDir*> bounds;
+  bounds.swap(export_bounds[dir]);
+  export_bounds.erase(dir);
+
+  // -- adjust dir_auth --
+  // base
+  CDir *im = dir;
+  if (dir->get_inode()->authority() == mds->get_nodeid()) {
+    // parent is already me.  adding to existing import.
+    im = mds->mdcache->get_auth_container(dir);
+    assert(im);
+    mds->mdcache->nested_exports[im].erase(dir);
+    dir->set_dir_auth( CDIR_AUTH_PARENT );     
+    dir->state_set(CDIR_STATE_EXPORT);
+    dir->get(CDir::PIN_EXPORT);
   } else {
-    dout(7) << "handle_export_dir_notify_ack on " << *dir << " from " << from 
-            << ", still waiting for " << export_notify_ack_waiting[dir] << endl;
+    // parent isn't me.  new import.
+    mds->mdcache->imports.insert(dir);
+    dir->set_dir_auth( mds->get_nodeid() );               
+    dir->state_set(CDIR_STATE_IMPORT);
+    dir->get(CDir::PIN_IMPORT);
   }
 
-  delete m;
+  dout(10) << "  base " << *dir << endl;
+  if (dir != im)
+    dout(10) << "  under " << *im << endl;
+  
+  // bounds
+  for (set<CDir*>::iterator p = bounds.begin();
+       p != bounds.end();
+       ++p) {
+    CDir *bd = *p;
+    
+    if (bd->get_dir_auth() == mds->get_nodeid()) {
+      // still me.  was an import. 
+      mds->mdcache->imports.erase(bd);
+      bd->set_dir_auth( CDIR_AUTH_PARENT );   
+      bd->state_clear(CDIR_STATE_IMPORT);
+      bd->put(CDir::PIN_IMPORT);
+      // move nested exports.
+      for (set<CDir*>::iterator q = mds->mdcache->nested_exports[bd].begin();
+          q != mds->mdcache->nested_exports[bd].end();
+          ++q) 
+       mds->mdcache->nested_exports[im].insert(*q);
+      mds->mdcache->nested_exports.erase(bd);  
+    } else {
+      // not me anymore.  now an export.
+      mds->mdcache->exports.insert(bd);
+      mds->mdcache->nested_exports[im].insert(bd);
+      assert(bd->get_dir_auth() != CDIR_AUTH_PARENT);
+      bd->set_dir_auth( CDIR_AUTH_UNKNOWN );
+      bd->state_set(CDIR_STATE_EXPORT);
+      bd->get(CDir::PIN_EXPORT);
+    }
+    
+    dout(10) << "  bound " << *bd << endl;
+  }
+
+
+  // -- adjust auth bits --
+  
+  
+
+
 }
 
+void Migrator::export_dir_acked(CDir *dir)
+{
+  dout(7) << "export_dir_acked " << *dir << endl;
+  export_notify_ack_waiting.erase(dir);
+  
+  export_state[dir] = EXPORT_LOGGINGFINISH;
+  export_bounds.erase(dir);
+  
+  // log export completion, then finish (unfreeze, trigger finish context, etc.)
+  mds->mdlog->submit_entry(new EExportFinish(dir, true),
+                          new C_MDS_ExportFinishLogged(this, dir));
+}  
+
 
 /*
  * once i get all teh notify_acks i can finish
  */
 void Migrator::export_dir_finish(CDir *dir)
 {
-  // send finish/commit to new auth
-  mds->send_message_mds(new MExportDirFinish(dir->ino()), dir->authority(), MDS_PORT_MIGRATOR);
-  
-  // remove from exporting list
-  exporting.erase(dir);
-  
+  dout(7) << "export_dir_finish " << *dir << endl;
+
+  if (export_state.count(dir)) {
+    // send finish/commit to new auth
+    mds->send_message_mds(new MExportDirFinish(dir->ino()), dir->authority(), MDS_PORT_MIGRATOR);
+
+    // remove from exporting list
+    export_state.erase(dir);
+    export_peer.erase(dir);
+  } else {
+    dout(7) << "target must have failed, not sending final commit message.  export succeeded anyway." << endl;
+  }
+    
   // unfreeze
-  dout(7) << "export_dir_finish " << *dir << ", unfreezing" << endl;
+  dout(7) << "export_dir_finish unfreezing" << endl;
   dir->unfreeze_tree();
-
+  
   // unpin path
   dout(7) << "export_dir_finish unpinning path" << endl;
   vector<CDentry*> trace;
@@ -874,10 +1086,10 @@ void Migrator::export_dir_finish(CDir *dir)
 
 
 
+// ==========================================================
+// IMPORT
 
 
-//  IMPORTS
-
 class C_MDC_ExportDirDiscover : public Context {
   Migrator *mig;
   MExportDirDiscover *m;
@@ -939,6 +1151,9 @@ void Migrator::handle_export_dir_discover_2(MExportDirDiscover *m, CInode *in, i
   
   // pin auth too, until the import completes.
   in->auth_pin();
+
+  import_state[in->ino()] = IMPORT_DISCOVERED;
+
   
   // reply
   dout(7) << " sending export_dir_discover_ack on " << *in << endl;
@@ -993,6 +1208,9 @@ void Migrator::handle_export_dir_prep(MExportDirPrep *m)
     // auth pin too
     dir->auth_pin();
     diri->auth_unpin();
+
+    // change import state
+    import_state[diri->ino()] = IMPORT_PREPPING;
     
     // assimilate traces to exports
     for (list<CInodeDiscover*>::iterator it = m->get_inodes().begin();
@@ -1040,6 +1258,9 @@ void Migrator::handle_export_dir_prep(MExportDirPrep *m)
       CInode *in = cache->get_inode(*it);
       assert(in);
       
+      // note bound.
+      import_bounds[dir->ino()].insert(*it);
+
       if (!in->dir) {
         dout(7) << "  opening nested export on " << *in << endl;
         cache->open_remote_dir(in,
@@ -1089,7 +1310,10 @@ void Migrator::handle_export_dir_prep(MExportDirPrep *m)
     dout(7) << " all ready, sending export_dir_prep_ack on " << *dir << endl;
     mds->send_message_mds(new MExportDirPrepAck(dir->ino()),
                          m->get_source().num(), MDS_PORT_MIGRATOR);
-    
+
+    // note new state
+    import_state[diri->ino()] = IMPORT_PREPPED;
+
     // done 
     delete m;
   }
@@ -1216,8 +1440,8 @@ void Migrator::handle_export_dir(MExportDir *m)
     // mark export point frozenleaf
     ex->get(CDir::PIN_FREEZELEAF);
     ex->state_set(CDIR_STATE_FROZENTREELEAF);
-    import_freeze_leaves[dir->ino()].insert(ex);  // and take note!
-
+    assert(import_bounds[dir->ino()].count(*it));    // we took note during prep stage
+    
     // remove our pin
     ex->put(CDir::PIN_IMPORTINGEXPORT);
     ex->state_clear(CDIR_STATE_IMPORTINGEXPORT);
@@ -1281,11 +1505,16 @@ void Migrator::handle_export_dir(MExportDir *m)
   // adjust popularity
   mds->balancer->add_import(dir);
 
+  dout(7) << "handle_export_dir did " << *dir << endl;
+
   // log it
   mds->mdlog->submit_entry(le,
                           new C_MDS_ImportDirLoggedStart(this, dir, m->get_source().num(), 
                                                          imported_subdirs, m->get_exports()));
 
+  // note state
+  import_state[dir->ino()] = IMPORT_LOGGINGSTART;
+
   // some stats
   if (mds->logger) {
     mds->logger->inc("im");
@@ -1303,7 +1532,8 @@ void Migrator::import_dir_logged_start(CDir *dir, int from,
 {
   dout(7) << "import_dir_logged " << *dir << endl;
 
-  assert(0); // test die
+  // note state
+  import_state[dir->ino()] = IMPORT_ACKING;
 
   // send notify's etc.
   dout(7) << "sending notifyack for " << *dir << " to old auth mds" << from << endl;
@@ -1349,6 +1579,9 @@ void Migrator::handle_export_dir_finish(MExportDirFinish *m)
   dout(7) << "handle_export_dir_finish logging import_finish on " << *dir << endl;
   assert(dir->is_auth());
 
+  // note state
+  import_state[dir->ino()] = IMPORT_LOGGINGFINISH;
+
   // log
   mds->mdlog->submit_entry(new EImportFinish(dir, true),
                           new C_MDS_ImportDirLoggedFinish(this,dir));
@@ -1357,34 +1590,38 @@ void Migrator::handle_export_dir_finish(MExportDirFinish *m)
 
 void Migrator::import_dir_logged_finish(CDir *dir)
 {
-  dout(7) << "import_dir_finish" << endl;
-
-  dout(5) << "done with import of " << *dir << endl;
-  show_imports();
-  if (mds->logger) {
-    mds->logger->set("nex", cache->exports.size());
-    mds->logger->set("nim", cache->imports.size());
-  }
+  dout(7) << "import_dir_logged_finish " << *dir << endl;
 
   // un auth pin (other exports can now proceed)
   dir->auth_unpin();  
   
   // unfreeze!
-  for (set<CDir*>::iterator p = import_freeze_leaves[dir->ino()].begin();
-       p != import_freeze_leaves[dir->ino()].end();
+  for (set<inodeno_t>::iterator p = import_bounds[dir->ino()].begin();
+       p != import_bounds[dir->ino()].end();
        ++p) {
-    (*p)->put(CDir::PIN_FREEZELEAF);
-    (*p)->state_clear(CDIR_STATE_FROZENTREELEAF);
+    CInode *diri = mds->mdcache->get_inode(*p);
+    CDir *dir = diri->dir;
+    assert(dir->state_test(CDIR_STATE_FROZENTREELEAF));
+    dir->put(CDir::PIN_FREEZELEAF);
+    dir->state_clear(CDIR_STATE_FROZENTREELEAF);
   }
-  import_freeze_leaves.erase(dir->ino());
 
   dir->unfreeze_tree();
       
+  // clear import state (we're done!)
+  import_state.erase(dir->ino());
+  import_bounds.erase(dir->ino());
 
   // ok now finish contexts
   dout(5) << "finishing any waiters on imported data" << endl;
   dir->finish_waiting(CDIR_WAIT_IMPORTED);
 
+  // log it
+  if (mds->logger) {
+    mds->logger->set("nex", cache->exports.size());
+    mds->logger->set("nim", cache->imports.size());
+  }
+  show_imports();
 
   // is it empty?
   if (dir->get_size() == 0 &&
@@ -1510,10 +1747,11 @@ int Migrator::import_dir_block(bufferlist& bl,
   // add to journal entry
   le->metablob.add_dir(dir, true);  // Hmm: false would be okay in some cases
 
+  int num_imported = 0;
+
   if (dir->is_hashed()) {
 
     // do nothing; dir is hashed
-    return 0;
   } else {
     // take all waiters on this dir
     // NOTE: a pass of imported data is guaranteed to get all of my waiters because
@@ -1529,7 +1767,6 @@ int Migrator::import_dir_block(bufferlist& bl,
     dout(15) << "doing contents" << endl;
     
     // contents
-    int num_imported = 0;
     long nden = dstate.get_nden();
 
     for (; nden>0; nden--) {
@@ -1586,8 +1823,10 @@ int Migrator::import_dir_block(bufferlist& bl,
       le->metablob.add_dentry(dn, true);  // Hmm: might we do dn->is_dirty() here instead?  
     }
 
-    return num_imported;
   }
+
+  dout(7) << " import_dir_block done " << *dir << endl;
+  return num_imported;
 }
 
 
index 033885b4e4f7ce66f126e2110ea8c6793a751cf6..a438e8bdb785515eeb0e285f94654e05026a7eef 100644 (file)
@@ -61,9 +61,20 @@ private:
   MDS *mds;
   MDCache *cache;
 
+  // -- exports --
+  // export stages.  used to clean up intelligently if there's a failure.
+  const static int EXPORT_DISCOVERING   = 1;  // dest is disovering export dir
+  const static int EXPORT_FREEZING      = 2;  // we're freezing the dir tree
+  const static int EXPORT_LOGGINGSTART  = 3;  // we're logging EExportStart
+  const static int EXPORT_PREPPING      = 4;  // sending dest spanning tree to export bounds
+  const static int EXPORT_EXPORTING     = 5;  // sent actual export, waiting for acks
+  const static int EXPORT_LOGGINGFINISH = 6; // logging EExportFinish
+  
   // export fun
-  set<CDir*>             exporting;
-  map<CDir*, set<int> >  export_notify_ack_waiting; // nodes i am waiting to get export_notify_ack's from
+  map<CDir*, int>              export_state;
+  map<CDir*, int>              export_peer;
+  map<CDir*, set<CDir*> >      export_bounds;
+  map<CDir*, set<int> >        export_notify_ack_waiting; // nodes i am waiting to get export_notify_ack's from
   map<CDir*, list<inodeno_t> > export_proxy_inos;
   map<CDir*, list<inodeno_t> > export_proxy_dirinos;
   
@@ -72,29 +83,56 @@ private:
   set<inodeno_t>                    stray_export_warnings; // notifies i haven't seen
   map<inodeno_t, MExportDirNotify*> stray_export_notifies;
   
-  // import muck
-  map<inodeno_t, set<CDir*> > import_freeze_leaves;
 
-  // hashing madness
+  // -- imports --
+  const static int IMPORT_DISCOVERED    = 1; // waiting for prep
+  const static int IMPORT_PREPPING      = 2; // opening dirs on bounds
+  const static int IMPORT_PREPPED       = 3; // opened bounds, waiting for import
+  const static int IMPORT_LOGGINGSTART  = 3; // got import, logging EImportStart
+  const static int IMPORT_ACKING        = 4; // logged, sent acks
+  const static int IMPORT_LOGGINGFINISH = 5;
+
+  map<inodeno_t,int>             import_state;
+  map<inodeno_t,set<inodeno_t> > import_bounds;
+
+
+  // -- hashing madness --
   multimap<CDir*, int>   unhash_waiting;  // nodes i am waiting for UnhashDirAck's from
   multimap<inodeno_t, inodeno_t>    import_hashed_replicate_waiting;  // nodes i am waiting to discover to complete my import of a hashed dir
   // maps frozen_dir_ino's to waiting-for-discover ino's.
   multimap<inodeno_t, inodeno_t>    import_hashed_frozen_waiting;    // dirs i froze (for the above)
 
+
+
 public:
   // -- cons --
   Migrator(MDS *m, MDCache *c) : mds(m), cache(c) {}
 
   void dispatch(Message*);
 
-  //bool is_importing();
-  bool is_exporting(CDir *dir = 0) {
-    if (dir)
-      return exporting.count(dir);
-    else 
-      return !exporting.empty();
+  
+  // -- status --
+  int is_exporting(CDir *dir) {
+    if (export_state.count(dir)) return export_state[dir];
+    return 0;
+  }
+  bool is_exporting() { return !export_state.empty(); }
+  int is_importing(inodeno_t dirino) {
+    if (import_state.count(dirino)) return import_state[dirino];
+    return 0;
+  }
+  bool is_importing() { return !import_state.empty(); }
+  const set<inodeno_t>& get_import_bounds(inodeno_t base) { 
+    assert(import_bounds.count(base));
+    return import_bounds[base];
   }
 
+
+  // -- misc --
+  void handle_mds_failure(int who);
+  void show_imports();
+
+
   // -- import/export --
   // exporter
  public:
@@ -121,9 +159,11 @@ public:
                       CDir *basedir,
                       CDir *dir,
                       int newauth);
-  void export_dir_finish(CDir *dir);
   void handle_export_dir_notify_ack(MExportDirNotifyAck *m);
-    
+  void reverse_export(CDir *dir);
+  void export_dir_acked(CDir *dir);
+  void export_dir_finish(CDir *dir);
+
   friend class C_MDC_ExportFreeze;
   friend class C_MDC_ExportStartLogged;
   friend class C_MDS_ExportFinishLogged;
@@ -155,7 +195,6 @@ public:
   void handle_export_dir_warning(MExportDirWarning *m);
   void handle_export_dir_notify(MExportDirNotify *m);
 
-  void show_imports();
 
   // -- hashed directories --
 
index 7d5554c9ca8b56dafde595052326e5a494f60c65..2789e30844743274c8ddd949726d6b6842e90c62 100644 (file)
 
 #include "include/types.h"
 
+// sent from replica to auth
 
 class MMDSCacheRejoin : public Message {
-  
-  struct InodeState {
-       int hardlock;     // hardlock state
-       int filelock;     // filelock state
-       int caps_wanted;  // what caps bits i want
-       InodeState(int cw=0, int hl=-1, int fl=-1) : hardlock(hl), filelock(fl), caps_wanted(cw) {}
-  };
-
-  map<inodeno_t, InodeState> inodes;
-  map<inodeno_t, map<string, int> > dentries;
+ public:
+  map<inodeno_t,int> inodes; // ino -> caps_wanted
   set<inodeno_t> dirs;
+  map<inodeno_t, set<string> > dentries;   // dir -> (dentries...)
 
- public:
   MMDSCacheRejoin() : Message(MSG_MDS_CACHEREJOIN) {}
 
   char *get_type_name() { return "cache_rejoin"; }
@@ -42,23 +35,27 @@ class MMDSCacheRejoin : public Message {
   }
 
   void add_dir(inodeno_t dirino) {
-       dirs.insert(dirino);
+    dirs.insert(dirino);
   }
-  void add_dentry(inodeno_t dirino, const string& dn, int ls) {
-       dentries[dirino][dn] = ls;
+  void add_dentry(inodeno_t dirino, const string& dn) {
+    dentries[dirino].insert(dn);
   }
-  void add_inode(inodeno_t ino, int hl, int fl, int cw) {
-       inodes[ino] = InodeState(cw,hl,fl);
+  void add_inode(inodeno_t ino, int cw) {
+    inodes[ino] = cw;
   }
   
   void encode_payload() {
-       ::_encode(inodes, payload);
-       ::_encode(dirs, payload);
+    ::_encode(inodes, payload);
+    ::_encode(dirs, payload);
+    for (set<inodeno_t>::iterator p = dirs.begin(); p != dirs.end(); ++p)
+      ::_encode(dentries[*p], payload);
   }
   void decode_payload() {
-       int off = 0;
-       ::_decode(inodes, payload, off);
-       ::_decode(dirs, payload, off);
+    int off = 0;
+    ::_decode(inodes, payload, off);
+    ::_decode(dirs, payload, off);
+    for (set<inodeno_t>::iterator p = dirs.begin(); p != dirs.end(); ++p)
+      ::_decode(dentries[*p], payload, off);
   }
 };
 
diff --git a/branches/sage/cephmds2/messages/MMDSCacheRejoinAck.h b/branches/sage/cephmds2/messages/MMDSCacheRejoinAck.h
new file mode 100644 (file)
index 0000000..b8f0d23
--- /dev/null
@@ -0,0 +1,82 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MMDSCACHEREJOINACK_H
+#define __MMDSCACHEREJOINACK_H
+
+#include "msg/Message.h"
+
+#include "include/types.h"
+
+// sent from auth back to replica
+
+class MMDSCacheRejoinAck : public Message {
+ public:
+  struct inodeinfo { 
+    inodeno_t ino;
+    int hardlock;
+    int filelock;
+    int nonce;
+    inodeinfo() {}
+    inodeinfo(inodeno_t i, int h, int f, int n) : ino(i), hardlock(h), filelock(f), nonce(n) {}
+  };
+  struct dninfo {
+    int lock;
+    int nonce;
+    dninfo() {}
+    dninfo(int l, int n) : lock(l), nonce(n) {}
+  };
+  struct dirinfo {
+    inodeno_t dirino;
+    int nonce;
+    dirinfo() {}
+    dirinfo(inodeno_t i, int n) : dirino(i), nonce(n) {}
+  };
+  list<inodeinfo> inodes; 
+  map<inodeno_t, map<string,dninfo> > dentries;
+  list<dirinfo> dirs;
+
+  MMDSCacheRejoinAck() : Message(MSG_MDS_CACHEREJOINACK) {}
+
+  char *get_type_name() { return "cache_rejoin_ack"; }
+
+  void print(ostream& out) {
+    out << "cache_rejoin" << endl;
+  }
+
+  void add_dir(inodeno_t dirino, int nonce) {
+    dirs.push_back(dirinfo(dirino,nonce));
+  }
+  void add_dentry(inodeno_t dirino, const string& dn, int ls, int nonce) {
+    dentries[dirino][dn] = dninfo(ls, nonce);
+  }
+  void add_inode(inodeno_t ino, int hl, int fl, int nonce) {
+    inodes.push_back(inodeinfo(ino, hl, fl, nonce));
+  }
+  
+  void encode_payload() {
+    ::_encode(inodes, payload);
+    ::_encode(dirs, payload);
+    for (list<dirinfo>::iterator p = dirs.begin(); p != dirs.end(); ++p)
+      ::_encode(dentries[p->dirino], payload);
+  }
+  void decode_payload() {
+    int off = 0;
+    ::_decode(inodes, payload, off);
+    ::_decode(dirs, payload, off);
+    for (list<dirinfo>::iterator p = dirs.begin(); p != dirs.end(); ++p)
+      ::_decode(dentries[p->dirino], payload, off);
+ }
+};
+
+#endif
index 3469f1b4f9007c30ff05fe2e2fa49c7d78d44cbd..22774cdabc2eccf92e1cf67a22bdf7a6537c28e9 100644 (file)
@@ -41,7 +41,7 @@ class MMDSImportMap : public Message {
     imap[im].insert(ex);
   }
 
-  void add_ambiguous_import(inodeno_t im, set<inodeno_t>& m) {
+  void add_ambiguous_import(inodeno_t im, const set<inodeno_t>& m) {
     ambiguous_imap[im] = m;
   }
 
index c03c44243891aeef8ea2db5212b4d1262fac2243..3fca23c9215c5c1c399a658604587c28dbdc1c5b 100644 (file)
@@ -262,7 +262,7 @@ void MDSMonitor::tick()
            break;
 
          case MDSMap::STATE_STANDBY:
-           if (mdsmap.is_created(*p))
+           if (mdsmap.has_created(*p))
              newstate = MDSMap::STATE_OUT;
            else
              newstate = MDSMap::STATE_DNE;
index 7576a56696a861496dd5152954129e72c0ff4bad..1f563a7cdaddd36297377c3e5657ee689cd80fbb 100644 (file)
@@ -53,6 +53,7 @@ using namespace std;
 #include "messages/MMDSBeacon.h"
 #include "messages/MMDSImportMap.h"
 #include "messages/MMDSCacheRejoin.h"
+#include "messages/MMDSCacheRejoinAck.h"
 
 #include "messages/MDirUpdate.h"
 #include "messages/MDiscover.h"
@@ -256,6 +257,9 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
   case MSG_MDS_CACHEREJOIN:
        m = new MMDSCacheRejoin;
        break;
+  case MSG_MDS_CACHEREJOINACK:
+       m = new MMDSCacheRejoinAck;
+       break;
 
   case MSG_MDS_DIRUPDATE:
     m = new MDirUpdate();
index 7d9c1d8bae56ce526b104f5b9572f4736c701d2b..523ff70932cc91b2e7f1900af43727c261a725b6 100644 (file)
@@ -94,6 +94,7 @@
 
 #define MSG_MDS_IMPORTMAP          106
 #define MSG_MDS_CACHEREJOIN        107
+#define MSG_MDS_CACHEREJOINACK     108
 
 #define MSG_MDS_DISCOVER           110
 #define MSG_MDS_DISCOVERREPLY      111