// make note of recovery set
mds->mdsmap->get_recovery_mds_set(recovery_set);
recovery_set.erase(mds->get_nodeid());
- dout(1) << "my recovery peers will be " << recovery_set << dendl;
+ dout(1) << "handle_mds_failure mds" << who << " : recovery peers are " << recovery_set << dendl;
// adjust my recovery lists
wants_resolve.erase(who); // MDS will ask again
got_resolve.erase(who); // i'll get another.
+
+ rejoin_sent.erase(who); // i need to send another
rejoin_ack_gather.erase(who); // i'll need/get another.
// tell the migrator too.
dout(7) << "ambiguous import succeeded on " << *dir << dendl;
migrator->import_finish(dir);
}
+ my_ambiguous_imports.erase(p); // no longer ambiguous.
}
p = next;
}
/*
* rejoin phase!
+ *
+ * this initiates rejoin. it shoudl be called before we get any
+ * rejoin or rejoin_ack messages (or else mdsmap distribution is broken).
+ *
* we start out by sending rejoins to everyone in the recovery set.
*
* if we are rejoin, send for all regions in our cache.
p != recovery_set.end();
++p) {
if (*p == mds->get_nodeid()) continue; // nothing to myself!
+ if (rejoin_sent.count(*p)) continue; // already sent a rejoin to this node!
if (mds->is_rejoin()) {
rejoin_gather.insert(*p);
rejoins[*p] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_WEAK);
}
if (!mds->is_rejoin()) {
- // strong.
+ // i am survivor. send strong rejoin.
// note request authpins, xlocks
for (hash_map<metareqid_t, MDRequest*>::iterator p = active_requests.begin();
p != active_requests.end();
}
// send the messages
- assert(rejoin_ack_gather.empty());
for (map<int,MMDSCacheRejoin*>::iterator p = rejoins.begin();
p != rejoins.end();
++p) {
- mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE);
+ assert(rejoin_sent.count(p->first) == 0);
+ assert(rejoin_ack_gather.count(p->first) == 0);
+ rejoin_sent.insert(p->first);
rejoin_ack_gather.insert(p->first);
+ mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE);
}
// nothing?
if (mds->is_rejoin() && rejoins.empty()) {
- dout(10) << "nothing left to rejoin" << dendl;
+ dout(10) << "nothing to rejoin" << dendl;
mds->rejoin_done();
}
}
dout(10) << " got dirty inode scatterlock content " << *in << dendl;
}
- if (survivor)
- rejoin_scour_survivor_replicas(from, ack);
-
if (survivor) {
- // send ack
+ // survivor. do everything now.
+ rejoin_scour_survivor_replicas(from, ack);
mds->send_message_mds(ack, from, MDS_PORT_CACHE);
} else {
// done?
+ assert(rejoin_gather.count(from));
rejoin_gather.erase(from);
if (rejoin_gather.empty()) {
rejoin_gather_finish();
// send missing?
if (missing) {
+ // we expect a FULL soon.
mds->send_message_mds(missing, from, MDS_PORT_CACHE);
} else {
// done?
+ assert(rejoin_gather.count(from));
rejoin_gather.erase(from);
if (rejoin_gather.empty()) {
rejoin_gather_finish();
}
// done?
+ assert(rejoin_ack_gather.count(from));
rejoin_ack_gather.erase(from);
if (mds->is_rejoin() &&
rejoin_gather.empty() && // make sure we've gotten our FULL inodes, too.
}
// done?
+ assert(rejoin_gather.count(from));
rejoin_gather.erase(from);
if (rejoin_gather.empty()) {
rejoin_gather_finish();
}
};
+
+
void MDCache::rejoin_gather_finish()
{
dout(10) << "rejoin_gather_finish" << dendl;
if (!dir->is_full_dir_auth()) continue;
ls.push_back(dir);
}
+ int max = 5; // throttle shutdown exports.. hack!
for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) {
CDir *dir = *p;
dout(7) << "sending " << *dir << " back to mds0" << dendl;
migrator->export_dir(dir, 0);
+ if (--max == 0) break;
}
}
protected:
// [rejoin]
set<int> rejoin_gather; // nodes from whom i need a rejoin
+ set<int> rejoin_sent; // nodes i sent a rejoin to
set<int> rejoin_ack_gather; // nodes from whom i need a rejoin ack
map<inodeno_t,map<int,inode_caps_reconnect_t> > cap_exports; // ino -> client -> capex
didit = true;
mdlog_logtype.add_inc("evadd");
mdlog_logtype.add_inc("evtrm");
+ mdlog_logtype.add_set("evtrmg");
mdlog_logtype.add_set("ev");
mdlog_logtype.add_inc("segadd");
mdlog_logtype.add_inc("segtrm");
}
logger->set("segtrmg", trimming_segments.size());
+ logger->set("evtrmg", trimming_events);
}
void MDLog::_maybe_trimmed(LogSegment *ls)
mdsmap->get_mds_set(oldcreating, MDSMap::STATE_CREATING);
set<int> oldstopped;
mdsmap->get_mds_set(oldstopped, MDSMap::STATE_STOPPED);
+ bool wasdegraded = mdsmap->is_degraded();
// decode and process
mdsmap->decode(m->get_encoded());
wasrejoining && !mdsmap->is_rejoining())
mdcache->dump_cache(); // for DEBUG only
}
+ if (wasdegraded && !mdsmap->is_degraded())
+ dout(1) << "cluster recovered." << dendl;
// did someone go active?
if (is_active() || is_stopping()) {
void MDS::stopping_start()
{
dout(2) << "stopping_start" << dendl;
-
+
// start cache shutdown
mdcache->shutdown_start();
if (!gather) gather = new C_Gather;
(*p)->dirlock.add_waiter(SimpleLock::WAIT_STABLE, gather->new_sub());
}
-
+
+ // idalloc
+ if (allocv > mds->idalloc->get_committed_version()) {
+ dout(10) << " saving idalloc table, need " << allocv << dendl;
+ if (!gather) gather = new C_Gather;
+ mds->idalloc->save(gather->new_sub(), allocv);
+ }
+
+ // clientmap
+ if (clientmapv > mds->clientmap.get_committed()) {
+ dout(10) << " saving clientmap, need " << clientmapv << dendl;
+ if (!gather) gather = new C_Gather;
+ mds->clientmap.save(gather->new_sub(), clientmapv);
+ }
+
// pending commit atids
for (hash_set<version_t>::iterator p = pending_commit_atids.begin();
p != pending_commit_atids.end();
<< " pending commit (not yet acked), waiting" << dendl;
mds->anchorclient->wait_for_ack(*p, gather->new_sub());
}
+
+ // anchortable
+ if (anchortablev > mds->anchortable->get_committed_version()) {
+ dout(10) << " saving anchor table, need " << anchortablev << dendl;
+ if (!gather) gather = new C_Gather;
+ mds->anchortable->save(gather->new_sub());
+ }
+
+ // FIXME client requests...?
+ // audit handling of anchor transactions?
+ // open files?
return gather;
}
// -----------------------
// ESession
-bool ESession::has_expired(MDS *mds)
-{
- if (mds->clientmap.get_committed() >= cmapv) {
- dout(10) << "ESession.has_expired newer clientmap " << mds->clientmap.get_committed()
- << " >= " << cmapv << " has committed" << dendl;
- return true;
- } else if (mds->clientmap.get_committing() >= cmapv) {
- dout(10) << "ESession.has_expired newer clientmap " << mds->clientmap.get_committing()
- << " >= " << cmapv << " is still committing" << dendl;
- return false;
- } else {
- dout(10) << "ESession.has_expired clientmap " << mds->clientmap.get_version()
- << " > " << cmapv << ", need to save" << dendl;
- return false;
- }
-}
-
-void ESession::expire(MDS *mds, Context *c)
-{
- dout(10) << "ESession.expire saving clientmap" << dendl;
- mds->clientmap.save(c, cmapv);
-}
void ESession::update_segment()
{
// -----------------------
// EAnchor
-bool EAnchor::has_expired(MDS *mds)
-{
- version_t cv = mds->anchortable->get_committed_version();
- if (cv < version) {
- dout(10) << "EAnchor.has_expired v " << version << " > " << cv
- << ", still dirty" << dendl;
- return false; // still dirty
- } else {
- dout(10) << "EAnchor.has_expired v " << version << " <= " << cv
- << ", already flushed" << dendl;
- return true; // already flushed
- }
-}
-
-void EAnchor::expire(MDS *mds, Context *c)
-{
- dout(10) << "EAnchor.expire saving anchor table" << dendl;
- mds->anchortable->save(c);
-}
-
void EAnchor::update_segment()
{
_segment->anchortablev = version;
// EAnchorClient
-bool EAnchorClient::has_expired(MDS *mds)
-{
- return true;
-}
-
-void EAnchorClient::expire(MDS *mds, Context *c)
-{
- assert(0);
-}
-
void EAnchorClient::replay(MDS *mds)
{
dout(10) << " EAnchorClient.replay op " << op << " atid " << atid << dendl;