]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
some rejoin fixes, export throttling during shutdown (hack)
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 2 Oct 2007 22:06:52 +0000 (22:06 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 2 Oct 2007 22:06:52 +0000 (22:06 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1881 29311d96-e01e-0410-9327-a35deaab8ce9

branches/sage/mds/mds/MDCache.cc
branches/sage/mds/mds/MDCache.h
branches/sage/mds/mds/MDLog.cc
branches/sage/mds/mds/MDS.cc
branches/sage/mds/mds/journal.cc

index 575c8f726b2b8ddf1109935624fb0f5570192d13..0dee3045bafcb384a81e611ae8baac7d99046751 100644 (file)
@@ -1128,11 +1128,13 @@ void MDCache::handle_mds_failure(int who)
   // make note of recovery set
   mds->mdsmap->get_recovery_mds_set(recovery_set);
   recovery_set.erase(mds->get_nodeid());
-  dout(1) << "my recovery peers will be " << recovery_set << dendl;
+  dout(1) << "handle_mds_failure mds" << who << " : recovery peers are " << recovery_set << dendl;
 
   // adjust my recovery lists
   wants_resolve.erase(who);   // MDS will ask again
   got_resolve.erase(who);     // i'll get another.
+
+  rejoin_sent.erase(who);        // i need to send another
   rejoin_ack_gather.erase(who);  // i'll need/get another.
  
   // tell the migrator too.
@@ -1325,6 +1327,7 @@ void MDCache::handle_resolve(MMDSResolve *m)
          dout(7) << "ambiguous import succeeded on " << *dir << dendl;
          migrator->import_finish(dir);
        }
+       my_ambiguous_imports.erase(p);  // no longer ambiguous.
       }
       p = next;
     }
@@ -1638,6 +1641,10 @@ void MDCache::recalc_auth_bits()
 
 /*
  * rejoin phase!
+ *
+ * this initiates rejoin.  it shoudl be called before we get any
+ * rejoin or rejoin_ack messages (or else mdsmap distribution is broken).
+ *
  * we start out by sending rejoins to everyone in the recovery set.
  *
  * if we are rejoin, send for all regions in our cache.
@@ -1662,6 +1669,7 @@ void MDCache::rejoin_send_rejoins()
        p != recovery_set.end();
        ++p) {
     if (*p == mds->get_nodeid())  continue;  // nothing to myself!
+    if (rejoin_sent.count(*p)) continue;     // already sent a rejoin to this node!
     if (mds->is_rejoin()) {
       rejoin_gather.insert(*p);
       rejoins[*p] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_WEAK);
@@ -1691,7 +1699,7 @@ void MDCache::rejoin_send_rejoins()
   }
 
   if (!mds->is_rejoin()) {
-    // strong.
+    // i am survivor.  send strong rejoin.
     // note request authpins, xlocks
     for (hash_map<metareqid_t, MDRequest*>::iterator p = active_requests.begin();
         p != active_requests.end();
@@ -1736,17 +1744,19 @@ void MDCache::rejoin_send_rejoins()
   }
 
   // send the messages
-  assert(rejoin_ack_gather.empty());
   for (map<int,MMDSCacheRejoin*>::iterator p = rejoins.begin();
        p != rejoins.end();
        ++p) {
-    mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE);
+    assert(rejoin_sent.count(p->first) == 0);
+    assert(rejoin_ack_gather.count(p->first) == 0);
+    rejoin_sent.insert(p->first);
     rejoin_ack_gather.insert(p->first);
+    mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE);
   }
 
   // nothing?
   if (mds->is_rejoin() && rejoins.empty()) {
-    dout(10) << "nothing left to rejoin" << dendl;
+    dout(10) << "nothing to rejoin" << dendl;
     mds->rejoin_done();
   }
 }
@@ -2005,14 +2015,13 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak)
     dout(10) << " got dirty inode scatterlock content " << *in << dendl;
   }
 
-  if (survivor)
-    rejoin_scour_survivor_replicas(from, ack);
-
   if (survivor) {
-    // send ack
+    // survivor.  do everything now.
+    rejoin_scour_survivor_replicas(from, ack);
     mds->send_message_mds(ack, from, MDS_PORT_CACHE);
   } else {
     // done?
+    assert(rejoin_gather.count(from));
     rejoin_gather.erase(from);
     if (rejoin_gather.empty()) {
       rejoin_gather_finish();
@@ -2291,9 +2300,11 @@ void MDCache::handle_cache_rejoin_strong(MMDSCacheRejoin *strong)
 
   // send missing?
   if (missing) {
+    // we expect a FULL soon.
     mds->send_message_mds(missing, from, MDS_PORT_CACHE);
   } else {
     // done?
+    assert(rejoin_gather.count(from));
     rejoin_gather.erase(from);
     if (rejoin_gather.empty()) {
       rejoin_gather_finish();
@@ -2386,6 +2397,7 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
   }
 
   // done?
+  assert(rejoin_ack_gather.count(from));
   rejoin_ack_gather.erase(from);
   if (mds->is_rejoin() && 
       rejoin_gather.empty() &&     // make sure we've gotten our FULL inodes, too.
@@ -2450,6 +2462,7 @@ void MDCache::handle_cache_rejoin_full(MMDSCacheRejoin *full)
   }
 
   // done?
+  assert(rejoin_gather.count(from));
   rejoin_gather.erase(from);
   if (rejoin_gather.empty()) {
     rejoin_gather_finish();
@@ -2525,6 +2538,8 @@ public:
   }
 };
 
+
+
 void MDCache::rejoin_gather_finish() 
 {
   dout(10) << "rejoin_gather_finish" << dendl;
@@ -3490,10 +3505,12 @@ bool MDCache::shutdown_pass()
       if (!dir->is_full_dir_auth()) continue;
       ls.push_back(dir);
     }
+    int max = 5; // throttle shutdown exports.. hack!
     for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) {
       CDir *dir = *p;
       dout(7) << "sending " << *dir << " back to mds0" << dendl;
       migrator->export_dir(dir, 0);
+      if (--max == 0) break;
     }
   }
 
index a6f11a89a574a87d2f21944d23580594668f1cfd..56fd38faec1a137b89b3eabb7d6cf16a9e588f3a 100644 (file)
@@ -383,6 +383,7 @@ public:
 protected:
   // [rejoin]
   set<int> rejoin_gather;      // nodes from whom i need a rejoin
+  set<int> rejoin_sent;        // nodes i sent a rejoin to
   set<int> rejoin_ack_gather;  // nodes from whom i need a rejoin ack
 
   map<inodeno_t,map<int,inode_caps_reconnect_t> > cap_exports; // ino -> client -> capex
index 3717ee1e39adf58c09f493483dd5a92d71598152..0b069bc71f9072c541e44fb4f1554326e697e943 100644 (file)
@@ -54,6 +54,7 @@ void MDLog::reopen_logger(utime_t start, bool append)
     didit = true;
     mdlog_logtype.add_inc("evadd");
     mdlog_logtype.add_inc("evtrm");
+    mdlog_logtype.add_set("evtrmg");
     mdlog_logtype.add_set("ev");
     mdlog_logtype.add_inc("segadd");
     mdlog_logtype.add_inc("segtrm");
@@ -313,6 +314,7 @@ void MDLog::try_trim(LogSegment *ls)
   }
   
   logger->set("segtrmg", trimming_segments.size());
+  logger->set("evtrmg", trimming_events);
 }
 
 void MDLog::_maybe_trimmed(LogSegment *ls) 
index c097376c6408aee8713141836626368f445f2ae4..9d97f138d9955c839eead43420fe41da9fd6bc13 100644 (file)
@@ -512,6 +512,7 @@ void MDS::handle_mds_map(MMDSMap *m)
   mdsmap->get_mds_set(oldcreating, MDSMap::STATE_CREATING);
   set<int> oldstopped;
   mdsmap->get_mds_set(oldstopped, MDSMap::STATE_STOPPED);
+  bool wasdegraded = mdsmap->is_degraded();
 
   // decode and process
   mdsmap->decode(m->get_encoded());
@@ -620,6 +621,8 @@ void MDS::handle_mds_map(MMDSMap *m)
        wasrejoining && !mdsmap->is_rejoining()) 
       mdcache->dump_cache();      // for DEBUG only
   }
+  if (wasdegraded && !mdsmap->is_degraded()) 
+    dout(1) << "cluster recovered." << dendl;
 
   // did someone go active?
   if (is_active() || is_stopping()) {
@@ -1003,7 +1006,7 @@ void MDS::handle_mds_recovery(int who)
 void MDS::stopping_start()
 {
   dout(2) << "stopping_start" << dendl;
-  
+
   // start cache shutdown
   mdcache->shutdown_start();
   
index 7e0b9f0f9474c10a63bb3c8a40849c22c8120a6f..7d843a99a0311039145e53fe98f75fd0956da530 100644 (file)
@@ -102,7 +102,21 @@ C_Gather *LogSegment::try_to_expire(MDS *mds)
     if (!gather) gather = new C_Gather;
     (*p)->dirlock.add_waiter(SimpleLock::WAIT_STABLE, gather->new_sub());
   }
-  
+
+  // idalloc
+  if (allocv > mds->idalloc->get_committed_version()) {
+    dout(10) << " saving idalloc table, need " << allocv << dendl;
+    if (!gather) gather = new C_Gather;
+    mds->idalloc->save(gather->new_sub(), allocv);
+  }
+
+  // clientmap
+  if (clientmapv > mds->clientmap.get_committed()) {
+    dout(10) << " saving clientmap, need " << clientmapv << dendl;
+    if (!gather) gather = new C_Gather;
+    mds->clientmap.save(gather->new_sub(), clientmapv);
+  }
+
   // pending commit atids
   for (hash_set<version_t>::iterator p = pending_commit_atids.begin();
        p != pending_commit_atids.end();
@@ -113,6 +127,17 @@ C_Gather *LogSegment::try_to_expire(MDS *mds)
             << " pending commit (not yet acked), waiting" << dendl;
     mds->anchorclient->wait_for_ack(*p, gather->new_sub());
   }
+  
+  // anchortable
+  if (anchortablev > mds->anchortable->get_committed_version()) {
+    dout(10) << " saving anchor table, need " << anchortablev << dendl;
+    if (!gather) gather = new C_Gather;
+    mds->anchortable->save(gather->new_sub());
+  }
+
+  // FIXME client requests...?
+  // audit handling of anchor transactions?
+  // open files?
 
   return gather;
 }
@@ -662,28 +687,6 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg)
 
 // -----------------------
 // ESession
-bool ESession::has_expired(MDS *mds) 
-{
-  if (mds->clientmap.get_committed() >= cmapv) {
-    dout(10) << "ESession.has_expired newer clientmap " << mds->clientmap.get_committed() 
-            << " >= " << cmapv << " has committed" << dendl;
-    return true;
-  } else if (mds->clientmap.get_committing() >= cmapv) {
-    dout(10) << "ESession.has_expired newer clientmap " << mds->clientmap.get_committing() 
-            << " >= " << cmapv << " is still committing" << dendl;
-    return false;
-  } else {
-    dout(10) << "ESession.has_expired clientmap " << mds->clientmap.get_version() 
-            << " > " << cmapv << ", need to save" << dendl;
-    return false;
-  }
-}
-
-void ESession::expire(MDS *mds, Context *c)
-{  
-  dout(10) << "ESession.expire saving clientmap" << dendl;
-  mds->clientmap.save(c, cmapv);
-}
 
 void ESession::update_segment()
 {
@@ -719,26 +722,6 @@ void ESession::replay(MDS *mds)
 // -----------------------
 // EAnchor
 
-bool EAnchor::has_expired(MDS *mds) 
-{
-  version_t cv = mds->anchortable->get_committed_version();
-  if (cv < version) {
-    dout(10) << "EAnchor.has_expired v " << version << " > " << cv
-            << ", still dirty" << dendl;
-    return false;   // still dirty
-  } else {
-    dout(10) << "EAnchor.has_expired v " << version << " <= " << cv
-            << ", already flushed" << dendl;
-    return true;    // already flushed
-  }
-}
-
-void EAnchor::expire(MDS *mds, Context *c)
-{
-  dout(10) << "EAnchor.expire saving anchor table" << dendl;
-  mds->anchortable->save(c);
-}
-
 void EAnchor::update_segment()
 {
   _segment->anchortablev = version;
@@ -780,16 +763,6 @@ void EAnchor::replay(MDS *mds)
 
 // EAnchorClient
 
-bool EAnchorClient::has_expired(MDS *mds) 
-{
-  return true;
-}
-
-void EAnchorClient::expire(MDS *mds, Context *c)
-{
-  assert(0);
-}
-
 void EAnchorClient::replay(MDS *mds)
 {
   dout(10) << " EAnchorClient.replay op " << op << " atid " << atid << dendl;