-- preemptive mdsmap sharing
-- how to disambiguate importstart w/o an importfinish?
-- der chicken/egg headache on immediately sending import_map, since imports are ambiguous.
-
+huh:
+- how to keep mds osd op ids unique after a failover?
+ - (and, how to flush out failed mds)
+
+
+- handle exporter recovery if importer fails during EXPORT_EXPORTING stage
+
+- delay response to sending import_map if export in progress?
+- finish export before sending import_map? (if is_recoverying or whatever...)
+
- ambiguous imports on active node should include in-progress imports!
+- how to effectively trim cache after resolve but before rejoin
+ - we need to eliminate unneed non-auth metadata, without hosing potentially useful auth metadata
+
+
+- falures during recovery stages... rejoin
+
+
importmap only sent after exports have completed.
failures update export ack waitlists, so exports will compelte if unrelated nodes fail.
failure of importer induces cleanup on exporter. no ambiguity.
-- no new mds may join if cluster is in a recovery state. creating|replay -> standby (unless failed)
+- no new mds may join if cluster is in a recovery state. starting -> standby (unless failed)
recoverer:
- enter rejoin state
syn_iargs.push_back( atoi(args[++i]) );
syn_iargs.push_back( atoi(args[++i]) );
- } else if (strcmp(args[i],"fullwalk") == 0) {
+ } else if (strcmp(args[i],"walk") == 0) {
syn_modes.push_back( SYNCLIENT_MODE_FULLWALK );
//syn_sargs.push_back( atoi(args[++i]) );
} else if (strcmp(args[i],"randomwalk") == 0) {
case SYNCLIENT_MODE_FULLWALK:
{
- string sarg1 = get_sarg(0);
+ string sarg1;// = get_sarg(0);
if (run_me()) {
dout(2) << "fullwalk" << sarg1 << endl;
full_walk(sarg1);
{
if (time_to_stop()) return -1;
- // read dir
- map<string, inode_t> contents;
- int r = client->getdir(basedir.c_str(), contents);
- if (r < 0) {
- dout(1) << "readdir on " << basedir << " returns " << r << endl;
- return r;
- }
+ list<string> dirq;
+ dirq.push_back(basedir);
- for (map<string, inode_t>::iterator it = contents.begin();
- it != contents.end();
- it++) {
- string file = basedir + "/" + it->first;
+ while (!dirq.empty()) {
+ string dir = dirq.front();
+ dirq.pop_front();
- struct stat st;
- int r = client->lstat(file.c_str(), &st);
+ // read dir
+ map<string, inode_t> contents;
+ int r = client->getdir(dir.c_str(), contents);
if (r < 0) {
- dout(1) << "stat error on " << file << " r=" << r << endl;
+ dout(1) << "readdir on " << dir << " returns " << r << endl;
continue;
}
-
- if ((st.st_mode & INODE_TYPE_MASK) == INODE_MODE_DIR) full_walk(file);
+
+ for (map<string, inode_t>::iterator it = contents.begin();
+ it != contents.end();
+ it++) {
+ if (it->first == ".") continue;
+ if (it->first == "..") continue;
+ string file = dir + "/" + it->first;
+
+ struct stat st;
+ int r = client->lstat(file.c_str(), &st);
+ if (r < 0) {
+ dout(1) << "stat error on " << file << " r=" << r << endl;
+ continue;
+ }
+
+ if ((st.st_mode & INODE_TYPE_MASK) == INODE_MODE_DIR) {
+ dirq.push_back(file);
+ }
+ }
}
return 0;
#include <vector>
#include <string>
+// set<string>
+inline void _encode(const std::set<std::string>& s, bufferlist& bl)
+{
+ int n = s.size();
+ bl.append((char*)&n, sizeof(n));
+ for (std::set<std::string>::const_iterator it = s.begin();
+ it != s.end();
+ it++) {
+ ::_encode(*it, bl);
+ n--;
+ }
+ assert(n==0);
+}
+inline void _decode(std::set<std::string>& s, bufferlist& bl, int& off)
+{
+ s.clear();
+ int n;
+ bl.copy(off, sizeof(n), (char*)&n);
+ off += sizeof(n);
+ for (int i=0; i<n; i++) {
+ std::string v;
+ ::_decode(v, bl, off);
+ s.insert(v);
+ }
+ assert(s.size() == (unsigned)n);
+}
+
+
// set<T>
template<class T>
inline void _encode(const std::set<T>& s, bufferlist& bl)
#define DN_LOCK_SYNC 0
#define DN_LOCK_PREXLOCK 1
#define DN_LOCK_XLOCK 2
-#define DN_LOCK_UNPINNING 3 // waiting for pins to go away
+#define DN_LOCK_UNPINNING 3 // waiting for pins to go away .. FIXME REVIEW THIS CODE ..
#define DN_XLOCK_FOREIGN ((Message*)0x1) // not 0, not a valid pointer.
out << " dir_auth=" << dir.get_dir_auth();
out << " state=" << dir.get_state();
+ if (dir.state_test(CDIR_STATE_COMPLETE)) out << "|complete";
+ if (dir.state_test(CDIR_STATE_FREEZINGTREE)) out << "|freezingtree";
+ if (dir.state_test(CDIR_STATE_FROZENTREE)) out << "|frozentree";
+ if (dir.state_test(CDIR_STATE_FROZENTREELEAF)) out << "|frozentreeleaf";
+ if (dir.state_test(CDIR_STATE_FROZENDIR)) out << "|frozendir";
+ if (dir.state_test(CDIR_STATE_FREEZINGDIR)) out << "|freezingdir";
+
out << " sz=" << dir.get_nitems() << "+" << dir.get_nnull();
if (dir.get_num_ref()) {
void CDir::unfreeze_tree()
{
dout(10) << "unfreeze_tree " << *this << endl;
- state_clear(CDIR_STATE_FROZENTREE);
-
- // unpin (may => FREEZEABLE) FIXME: is this order good?
- inode->auth_unpin();
- // waiters?
- finish_waiting(CDIR_WAIT_UNFREEZE);
+ if (state_test(CDIR_STATE_FROZENTREE)) {
+ // frozen. unfreeze.
+ state_clear(CDIR_STATE_FROZENTREE);
+
+ // unpin (may => FREEZEABLE) FIXME: is this order good?
+ inode->auth_unpin();
+
+ // waiters?
+ finish_waiting(CDIR_WAIT_UNFREEZE);
+ } else {
+ // freezing. stop it.
+ assert(state_test(CDIR_STATE_FREEZINGTREE));
+ state_clear(CDIR_STATE_FREEZINGTREE);
+
+ // cancel freeze waiters
+ finish_waiting(CDIR_WAIT_FREEZEABLE, -1);
+ }
}
bool CDir::is_freezing_tree()
|CDIR_STATE_DIRTY)
#define CDIR_MASK_STATE_IMPORT_KEPT (CDIR_STATE_IMPORT\
|CDIR_STATE_EXPORT\
- |CDIR_STATE_IMPORTINGEXPORT)
+ |CDIR_STATE_IMPORTINGEXPORT\
+ |CDIR_STATE_FROZENTREE)
#define CDIR_MASK_STATE_EXPORT_KEPT (CDIR_STATE_HASHED\
|CDIR_STATE_FROZENTREE\
|CDIR_STATE_FROZENDIR\
public:
CLock() :
- state(LOCK_LOCK),
+ state(LOCK_SYNC),
nread(0),
wrlock_by(0) {
}
#include "messages/MMDSImportMap.h"
#include "messages/MMDSCacheRejoin.h"
+#include "messages/MMDSCacheRejoinAck.h"
#include "messages/MDiscover.h"
#include "messages/MDiscoverReply.h"
// =====================
// recovery stuff
-void MDCache::send_import_maps()
-{
- dout(10) << "send_import_maps" << endl;
-
- for (set<int>::iterator p = want_import_map.begin();
- p != want_import_map.end();
- ++p)
- send_import_map(*p);
-
- want_import_map.clear();
-}
-
void MDCache::send_import_map(int who)
{
dout(10) << "send_import_map to mds" << who << endl;
p != imports.end();
p++) {
CDir *im = *p;
- m->add_import(im->ino());
-
- if (nested_exports.count(im)) {
- for (set<CDir*>::iterator q = nested_exports[im].begin();
- q != nested_exports[im].end();
- ++q) {
- CDir *ex = *q;
- m->add_import_export(im->ino(), ex->ino());
+
+ if (migrator->is_importing(im->ino())) {
+ // ambiguous (mid-import)
+ m->add_ambiguous_import(im->ino(),
+ migrator->get_import_bounds(im->ino()));
+ } else {
+ // not ambiguous.
+ m->add_import(im->ino());
+
+ if (nested_exports.count(im)) {
+ for (set<CDir*>::iterator q = nested_exports[im].begin();
+ q != nested_exports[im].end();
+ ++q) {
+ CDir *ex = *q;
+ m->add_import_export(im->ino(), ex->ino());
+ }
}
}
}
}
+
+/*
+ * during resolve state, we share import_maps to determine who
+ * is authoritative for which trees. we expect to get an import_map
+ * from _everyone_ in the recovery_set (the mds cluster at the time of
+ * the first failure).
+ */
void MDCache::handle_import_map(MMDSImportMap *m)
{
dout(7) << "handle_import_map from " << m->get_source() << endl;
int from = m->get_source().num();
- MMDSCacheRejoin *rejoin;
- if (mds->is_active() || mds->is_stopping())
- rejoin = new MMDSCacheRejoin;
-
// FIXME: check if we are a surviving ambiguous importer
// update my dir_auth values
if (ex->get_dir_auth() == CDIR_AUTH_PARENT)
ex->set_dir_auth(CDIR_AUTH_UNKNOWN);
}
-
- if (mds->is_active()) {
- // walk my cache to fill out CacheRejoin
- cache_rejoin_walk(im, rejoin);
- }
}
// note ambiguous imports too
pi != m->ambiguous_imap.end();
++pi)
mds->mdcache->other_ambiguous_imports[from][pi->first].swap( pi->second );
-
- if (mds->is_rejoin()) {
- assert(want_import_map.count(from));
- want_import_map.erase(from);
- if (want_import_map.empty()) {
- dout(10) << "got all import maps" << endl;
- disambiguate_imports();
- recalc_auth_bits();
- send_cache_rejoins();
- } else {
- dout(10) << "still waiting for importmaps from " << want_import_map << endl;
- }
- } else if (mds->is_active() || mds->is_stopping()) {
- mds->send_message_mds(rejoin, from, MDS_PORT_CACHE);
- }
+ // did i get them all?
+ got_import_map.insert(from);
+
+ if (got_import_map == recovery_set) {
+ dout(10) << "got all import maps, ready to rejoin" << endl;
+ disambiguate_imports();
+ recalc_auth_bits();
+
+ // move to rejoin state
+ mds->set_want_state(MDSMap::STATE_REJOIN);
+
+ } else {
+ dout(10) << "still waiting for more importmaps, got " << got_import_map
+ << ", need " << recovery_set << endl;
+ }
+
delete m;
}
+/*
+ * rejoin phase!
+ * we start out by sending rejoins to everyone in the recovery set.
+ *
+ * if _were_ are rejoining, send for all regions in our cache.
+ * if we are active|stopping, send only to nodes that are are rejoining.
+ */
void MDCache::send_cache_rejoins()
{
dout(10) << "send_cache_rejoins " << endl;
+
+ map<int, MMDSCacheRejoin*> rejoins;
+ // build list of dir_auth regions
+ list<CDir*> dir_auth_regions;
+ for (hash_map<inodeno_t,CInode*>::iterator p = inode_map.begin();
+ p != inode_map.end();
+ ++p) {
+ if (!p->second->is_dir()) continue;
+ if (!p->second->dir) continue;
+ if (p->second->dir->get_dir_auth() == CDIR_AUTH_PARENT) continue;
+
+ int auth = p->second->dir->get_dir_auth();
+ assert(auth >= 0);
+
+ if (auth == mds->get_nodeid()) continue; // skip my own regions!
+
+ if (rejoins.count(auth) == 0) {
+ if (mds->is_rejoin() || // if i am rejoining,
+ mds->mdsmap->is_rejoin(auth)) // or if they are rejoining,
+ rejoins[auth] = new MMDSCacheRejoin; // send a rejoin
+ else
+ continue; // otherwise, skip this region
+ }
+
+ // add to list
+ dout(10) << " on mds" << auth << " region " << *p->second << endl;
+ dir_auth_regions.push_back(p->second->dir);
+ }
+
+ // walk the regions
+ for (list<CDir*>::iterator p = dir_auth_regions.begin();
+ p != dir_auth_regions.end();
+ ++p) {
+ CDir *dir = *p;
+ int to = dir->authority();
+ cache_rejoin_walk(dir, rejoins[to]);
+ }
+
+ // send the messages
+ assert(rejoin_ack_gather.empty());
+ for (map<int,MMDSCacheRejoin*>::iterator p = rejoins.begin();
+ p != rejoins.end();
+ ++p) {
+ mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE);
+ rejoin_ack_gather.insert(p->first);
+ }
}
+
void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin)
{
dout(10) << "cache_rejoin_walk " << *dir << endl;
p != dir->items.end();
++p) {
// dentry
- if (mds->is_rejoin())
- rejoin->add_dentry(dir->ino(), p->first, -1);
- else
- rejoin->add_dentry(dir->ino(), p->first, p->second->lockstate);
-
+ rejoin->add_dentry(dir->ino(), p->first);
+
// inode?
if (p->second->is_primary() && p->second->get_inode()) {
CInode *in = p->second->get_inode();
- if (mds->is_rejoin())
- rejoin->add_inode(in->ino(),
- -1, -1,
- in->get_caps_wanted());
- else
- rejoin->add_inode(in->ino(),
- in->hardlock.get_state(), in->filelock.get_state(),
- in->get_caps_wanted());
+ rejoin->add_inode(in->ino(),
+ in->get_caps_wanted());
// dir?
if (in->dir &&
}
+/*
+ * i got a rejoin.
+ *
+ * - reply with the lockstate
+ *
+ * if i am active|stopping,
+ * - remove source from replica list for everything not referenced here.
+ */
void MDCache::handle_cache_rejoin(MMDSCacheRejoin *m)
{
dout(7) << "handle_cache_rejoin from " << m->get_source() << endl;
- //int from = m->get_source().num();
+ int from = m->get_source().num();
+
+ MMDSCacheRejoinAck *ack = new MMDSCacheRejoinAck;
+
+ if (mds->is_active() || mds->is_stopping()) {
+ dout(10) << "removing stale cache replicas" << endl;
+ // first, scour cache of replica references
+ for (hash_map<inodeno_t,CInode*>::iterator p = inode_map.begin();
+ p != inode_map.end();
+ ++p) {
+ // inode
+ CInode *in = p->second;
+ if (in->is_replica(from) && m->inodes.count(p->first) == 0) {
+ inode_remove_replica(in, from);
+ dout(10) << " rem " << *in << endl;
+ }
+
+ // dentry
+ if (in->parent) {
+ CDentry *dn = in->parent;
+ if (dn->is_replica(from) &&
+ (m->dentries.count(dn->get_dir()->ino()) == 0 ||
+ m->dentries[dn->get_dir()->ino()].count(dn->get_name()) == 0)) {
+ dn->remove_replica(from);
+ dout(10) << " rem " << *dn << endl;
+ }
+ }
+ // dir
+ if (in->dir) {
+ CDir *dir = in->dir;
+ if (dir->is_replica(from) && m->dirs.count(p->first) == 0) {
+ dir->remove_replica(from);
+ dout(10) << " rem " << *dir << endl;
+ }
+ }
+ }
+ } else {
+ assert(mds->is_rejoin());
+ }
+
+ // dirs
+ for (set<inodeno_t>::iterator p = m->dirs.begin();
+ p != m->dirs.end();
+ ++p) {
+ CInode *diri = get_inode(*p);
+ assert(diri);
+ CDir *dir = diri->dir;
+ assert(dir);
+ int nonce = dir->add_replica(from);
+ dout(10) << " has " << *dir << endl;
+ ack->add_dir(*p, nonce);
+
+ // dentries
+ for (set<string>::iterator q = m->dentries[*p].begin();
+ q != m->dentries[*p].end();
+ ++q) {
+ CDentry *dn = dir->lookup(*q);
+ assert(dn);
+ int nonce = dn->add_replica(from);
+ dout(10) << " has " << *dn << endl;
+ ack->add_dentry(*p, *q, dn->get_lockstate(), nonce);
+ }
+ }
+
+ // inodes
+ for (map<inodeno_t,int>::iterator p = m->inodes.begin();
+ p != m->inodes.end();
+ ++p) {
+ CInode *in = get_inode(p->first);
+ assert(in);
+ int nonce = in->add_replica(from);
+ if (p->second)
+ in->mds_caps_wanted[from] = p->second;
+ else
+ in->mds_caps_wanted.erase(from);
+ in->hardlock.gather_set.erase(from); // just in case
+ in->filelock.gather_set.erase(from); // just in case
+ dout(10) << " has " << *in << endl;
+ ack->add_inode(p->first,
+ in->hardlock.get_replica_state(), in->filelock.get_replica_state(),
+ nonce);
+ }
+
+ // send ack
+ mds->send_message_mds(ack, from, MDS_PORT_CACHE);
+ delete m;
+}
+
+
+void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoinAck *m)
+{
+ dout(7) << "handle_cache_rejoin from " << m->get_source() << endl;
+ int from = m->get_source().num();
+
+ // dirs
+ for (list<MMDSCacheRejoinAck::dirinfo>::iterator p = m->dirs.begin();
+ p != m->dirs.end();
+ ++p) {
+ CInode *diri = get_inode(p->dirino);
+ CDir *dir = diri->dir;
+ assert(dir);
+ dir->set_replica_nonce(p->nonce);
+ dout(10) << " got " << *dir << endl;
+
+ // dentries
+ for (map<string,MMDSCacheRejoinAck::dninfo>::iterator q = m->dentries[p->dirino].begin();
+ q != m->dentries[p->dirino].end();
+ ++q) {
+ CDentry *dn = dir->lookup(q->first);
+ assert(dn);
+ dn->set_replica_nonce(q->second.nonce);
+ dn->set_lockstate(q->second.lock);
+ dout(10) << " got " << *dn << endl;
+ }
+ }
+
+ // inodes
+ for (list<MMDSCacheRejoinAck::inodeinfo>::iterator p = m->inodes.begin();
+ p != m->inodes.end();
+ ++p) {
+ CInode *in = get_inode(p->ino);
+ assert(in);
+ in->set_replica_nonce(p->nonce);
+ in->hardlock.set_state(p->hardlock);
+ in->filelock.set_state(p->filelock);
+ dout(10) << " got " << *in << endl;
+ }
delete m;
+
+ // done?
+ rejoin_ack_gather.erase(from);
+ if (rejoin_ack_gather.empty()) {
+ dout(7) << "all done, going active!" << endl;
+ show_imports();
+ show_cache();
+ mds->set_want_state(MDSMap::STATE_ACTIVE);
+ } else {
+ dout(7) << "still need rejoin_ack from " << rejoin_ack_gather << endl;
+ }
+
}
+
+// ===============================================================================
+
void MDCache::rename_file(CDentry *srcdn,
CDentry *destdn)
{
handle_import_map((MMDSImportMap*)m);
break;
+ case MSG_MDS_CACHEREJOIN:
+ handle_cache_rejoin((MMDSCacheRejoin*)m);
+ break;
+ case MSG_MDS_CACHEREJOINACK:
+ handle_cache_rejoin_ack((MMDSCacheRejoinAck*)m);
+ break;
+
+
case MSG_MDS_DISCOVER:
handle_discover((MDiscover*)m);
break;
else if (nonce == in->get_replica_nonce(from)) {
// remove from our cached_by
dout(7) << "inode expire on " << *in << " from mds" << from << " cached_by was " << in->get_replicas() << endl;
- in->remove_replica(from);
- in->mds_caps_wanted.erase(from);
-
- // note: this code calls _eval more often than it needs to!
- // fix lock
- if (in->hardlock.is_gathering(from)) {
- in->hardlock.gather_set.erase(from);
- if (in->hardlock.gather_set.size() == 0)
- mds->locker->inode_hard_eval(in);
- }
- if (in->filelock.is_gathering(from)) {
- in->filelock.gather_set.erase(from);
- if (in->filelock.gather_set.size() == 0)
- mds->locker->inode_file_eval(in);
- }
-
- // alone now?
- if (!in->is_replicated()) {
- mds->locker->inode_hard_eval(in);
- mds->locker->inode_file_eval(in);
- }
+ inode_remove_replica(in, from);
}
else {
delete m;
}
+void MDCache::inode_remove_replica(CInode *in, int from)
+{
+ in->remove_replica(from);
+ in->mds_caps_wanted.erase(from);
+
+ // note: this code calls _eval more often than it needs to!
+ // fix lock
+ if (in->hardlock.is_gathering(from)) {
+ in->hardlock.gather_set.erase(from);
+ if (in->hardlock.gather_set.size() == 0)
+ mds->locker->inode_hard_eval(in);
+ }
+ if (in->filelock.is_gathering(from)) {
+ in->filelock.gather_set.erase(from);
+ if (in->filelock.gather_set.size() == 0)
+ mds->locker->inode_file_eval(in);
+ }
+
+ // alone now?
+ if (!in->is_replicated()) {
+ mds->locker->inode_hard_eval(in);
+ mds->locker->inode_file_eval(in);
+ }
+}
int MDCache::send_dir_updates(CDir *dir, bool bcast)
// from MMDSImportMaps
map<int, map<inodeno_t, set<inodeno_t> > > other_ambiguous_imports;
- set<int> want_import_map; // nodes i need to send my import map to (when exports finish)
- set<int> import_map_gather; // nodes i need an import_map from
+ set<int> recovery_set;
+ set<int> got_import_map; // nodes i need to send my import map to (when exports finish)
+ set<int> rejoin_ack_gather; // nodes i need a rejoin ack from
void handle_import_map(MMDSImportMap *m);
void handle_cache_rejoin(MMDSCacheRejoin *m);
void handle_cache_rejoin_ack(MMDSCacheRejoinAck *m);
void disambiguate_imports();
- void send_cache_rejoins();
void cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin);
void send_cache_rejoin_acks();
public:
void send_import_map(int who);
void send_import_maps();
+ void send_cache_rejoins();
void set_recovery_set(set<int>& s) {
- want_import_map.swap(s);
+ recovery_set = s;
}
// ambiguous imports
void log_import_map(Context *onsync=0);
-
-
+
// cache
void set_cache_size(size_t max) { lru.lru_set_max(max); }
size_t get_cache_size() { return lru.lru_get_size(); }
else
lru.lru_midtouch(dn);
}
+
+ void inode_remove_replica(CInode *in, int rep);
+
void rename_file(CDentry *srcdn, CDentry *destdn);
public:
// is it new?
if (epoch <= mdsmap->get_epoch()) {
- dout(1) << " old map epoch " << epoch << " < " << mdsmap->get_epoch()
+ dout(1) << " old map epoch " << epoch << " <= " << mdsmap->get_epoch()
<< ", discarding" << endl;
delete m;
return;
// note some old state
int oldwhoami = whoami;
int oldstate = state;
- set<int> oldrejoin;
- mdsmap->get_mds_set(oldrejoin, MDSMap::STATE_REJOIN);
+ set<int> oldresolve;
+ mdsmap->get_mds_set(oldresolve, MDSMap::STATE_RESOLVE);
+ bool wasrejoining = mdsmap->is_rejoining();
set<int> oldfailed;
mdsmap->get_mds_set(oldfailed, MDSMap::STATE_FAILED);
}
-
- // is anybody rejoining?
- if (is_rejoin() || is_active() || is_stopping()) {
- set<int> rejoin;
- mdsmap->get_mds_set(rejoin, MDSMap::STATE_REJOIN);
- dout(10) << "rejoin set is " << rejoin << ", was " << oldrejoin << endl;
- for (set<int>::iterator p = rejoin.begin(); p != rejoin.end(); ++p) {
+ // is anyone resolving?
+ if (is_resolve() || is_rejoin() || is_active() || is_stopping()) {
+ set<int> resolve;
+ mdsmap->get_mds_set(resolve, MDSMap::STATE_RESOLVE);
+ if (oldresolve != resolve)
+ dout(10) << "resolve set is " << resolve << ", was " << oldresolve << endl;
+ for (set<int>::iterator p = resolve.begin(); p != resolve.end(); ++p) {
if (*p == whoami) continue;
- if (oldrejoin.count(*p) == 0 || // if other guy newly rejoin, or
- oldstate == MDSMap::STATE_REPLAY) // if i'm newly rejoin,
+ if (oldresolve.count(*p) == 0 || // if other guy newly resolve, or
+ oldstate == MDSMap::STATE_REPLAY) // if i'm newly resolve,
mdcache->send_import_map(*p); // share my import map
}
}
+
+ // is everybody finally rejoining?
+ if (is_rejoin() || is_active() || is_stopping()) {
+ if (!wasrejoining && mdsmap->is_rejoining()) {
+ mdcache->send_cache_rejoins();
+ }
+ }
// did anyone go down?
if (is_active() || is_stopping()) {
assert(mdlog->get_read_pos() == mdlog->get_write_pos());
}
- mark_active();
+ set_want_state(MDSMap::STATE_ACTIVE);
}
case 6:
// done with replay!
- if (mdsmap->get_num_mds(MDSMap::STATE_REJOIN) == 0 &&
+ if (mdsmap->get_num_mds(MDSMap::STATE_ACTIVE) == 0 &&
+ mdsmap->get_num_mds(MDSMap::STATE_STOPPING) == 0 &&
+ mdsmap->get_num_mds(MDSMap::STATE_RESOLVE) == 0 &&
+ mdsmap->get_num_mds(MDSMap::STATE_REJOIN) == 0 &&
mdsmap->get_num_mds(MDSMap::STATE_REPLAY) == 1 && // me
mdsmap->get_num_mds(MDSMap::STATE_FAILED) == 0) {
dout(2) << "boot_replay " << step << ": i am alone, moving to state active" << endl;
- want_state = MDSMap::STATE_ACTIVE;
+ set_want_state(MDSMap::STATE_ACTIVE);
} else {
- dout(2) << "boot_replay " << step << ": i am not alone, moving to state rejoin" << endl;
- want_state = MDSMap::STATE_REJOIN;
+ dout(2) << "boot_replay " << step << ": i am not alone, moving to state resolve" << endl;
+ set_want_state(MDSMap::STATE_RESOLVE);
}
- beacon_send();
break;
}
}
-void MDS::mark_active()
+void MDS::set_want_state(int s)
{
- dout(3) << "mark_active" << endl;
- want_state = MDSMap::STATE_ACTIVE;
+ dout(3) << "set_want_state " << MDSMap::get_state_name(s) << endl;
+ want_state = s;
beacon_send();
}
}
// go
- want_state = MDSMap::STATE_STOPPING;
- beacon_send();
+ set_want_state(MDSMap::STATE_STOPPING);
return 0;
}
dout(1) << " handle_shutdown_start" << endl;
// set flag
- want_state = MDSMap::STATE_STOPPING;
- beacon_send();
+ set_want_state(MDSMap::STATE_STOPPING);
delete m;
}
dout(1) << "shutdown_final" << endl;
// send final down:out beacon (it doesn't matter if this arrives)
- want_state = MDSMap::STATE_OUT;
- beacon_send();
+ set_want_state(MDSMap::STATE_OUT);
// stop timers
if (beacon_killer) {
void MDS::my_dispatch(Message *m)
{
+ // from bad mds?
+ if (m->get_source().is_mds()) {
+ int from = m->get_source().num();
+ if (!mdsmap->have_inst(from) ||
+ mdsmap->get_inst(from) != m->get_source_inst()) {
+ // bogus mds?
+ if (m->get_type() != MSG_MDS_MAP) {
+ dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source()
+ << ", dropping" << endl;
+ delete m;
+ return;
+ } else {
+ dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source()
+ << ", but it's an mdsmap, looking at it" << endl;
+ }
+ }
+ }
+
switch (m->get_dest_port()) {
dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to up:stopped" << endl;
// tell monitor we shut down cleanly.
- want_state = MDSMap::STATE_STOPPED;
- beacon_send();
+ set_want_state(MDSMap::STATE_STOPPED);
}
}
case MSG_PING:
handle_ping((MPing*)m);
return;
+
+ default:
+ assert(0);
}
}
bool is_starting() { return state == MDSMap::STATE_STARTING; }
bool is_standby() { return state == MDSMap::STATE_STANDBY; }
bool is_replay() { return state == MDSMap::STATE_REPLAY; }
+ bool is_resolve() { return state == MDSMap::STATE_RESOLVE; }
bool is_rejoin() { return state == MDSMap::STATE_REJOIN; }
bool is_active() { return state == MDSMap::STATE_ACTIVE; }
bool is_stopping() { return state == MDSMap::STATE_STOPPING; }
bool is_stopped() { return state == MDSMap::STATE_STOPPED; }
- void mark_active();
+ void set_want_state(int s);
// -- waiters --
class MDSMap {
public:
+ // mds states
static const int STATE_DNE = 0; // down, never existed.
static const int STATE_OUT = 1; // down, once existed, but no imports, empty log.
static const int STATE_FAILED = 2; // down, holds (er, held) metadata; needs to be recovered.
static const int STATE_CREATING = 4; // up, creating MDS instance (new journal, idalloc..)
static const int STATE_STARTING = 5; // up, starting prior out MDS instance.
static const int STATE_REPLAY = 6; // up, scanning journal, recoverying any shared state
- static const int STATE_REJOIN = 7; // up, replayed journal, rejoining distributed cache
- static const int STATE_ACTIVE = 8; // up, active
- static const int STATE_STOPPING = 9; // up, exporting metadata (-> standby or out)
- static const int STATE_STOPPED = 10; // up, finished stopping. like standby, but not avail to takeover.
+ static const int STATE_RESOLVE = 7; // up, disambiguating partial distributed operations (import/export, ...rename?)
+ static const int STATE_REJOIN = 8; // up, replayed journal, rejoining distributed cache
+ static const int STATE_ACTIVE = 9; // up, active
+ static const int STATE_STOPPING = 10; // up, exporting metadata (-> standby or out)
+ static const int STATE_STOPPED = 11; // up, finished stopping. like standby, but not avail to takeover.
static const char *get_state_name(int s) {
switch (s) {
case STATE_CREATING: return "up:creating";
case STATE_STARTING: return "up:starting";
case STATE_REPLAY: return "up:replay";
+ case STATE_RESOLVE: return "up:resolve";
case STATE_REJOIN: return "up:rejoin";
case STATE_ACTIVE: return "up:active";
case STATE_STOPPING: return "up:stopping";
p != mds_state.end();
p++)
if (is_failed(p->first) ||
- is_replay(p->first) || is_rejoin(p->first) ||
+ is_replay(p->first) || is_resolve(p->first) || is_rejoin(p->first) ||
is_active(p->first) || is_stopping(p->first))
s.insert(p->first);
}
- // state
+ // mds states
bool is_down(int m) { return is_dne(m) || is_out(m) || is_failed(m); }
bool is_up(int m) { return !is_down(m); }
bool is_creating(int m) { return mds_state.count(m) && mds_state[m] == STATE_CREATING; }
bool is_starting(int m) { return mds_state.count(m) && mds_state[m] == STATE_STARTING; }
bool is_replay(int m) { return mds_state.count(m) && mds_state[m] == STATE_REPLAY; }
+ bool is_resolve(int m) { return mds_state.count(m) && mds_state[m] == STATE_RESOLVE; }
bool is_rejoin(int m) { return mds_state.count(m) && mds_state[m] == STATE_REJOIN; }
bool is_active(int m) { return mds_state.count(m) && mds_state[m] == STATE_ACTIVE; }
bool is_stopping(int m) { return mds_state.count(m) && mds_state[m] == STATE_STOPPING; }
bool is_stopped(int m) { return mds_state.count(m) && mds_state[m] == STATE_STOPPED; }
+ bool has_created(int m) { return mds_created.count(m); }
+
+ // cluster states
bool is_degraded() {
- return get_num_mds(STATE_REPLAY) + get_num_mds(STATE_REJOIN) + get_num_mds(STATE_FAILED);
+ return get_num_mds(STATE_REPLAY) +
+ get_num_mds(STATE_RESOLVE) +
+ get_num_mds(STATE_REJOIN) +
+ get_num_mds(STATE_FAILED);
}
- bool is_created(int m) {
- return mds_created.count(m);
+ /*bool is_resolving() { // nodes are resolving distributed ops
+ return get_num_mds(STATE_RESOLVE);
+ }*/
+ bool is_rejoining() {
+ // nodes are rejoining cache state
+ return get_num_mds(STATE_REJOIN) > 0 &&
+ get_num_mds(STATE_RESOLVE) == 0 &&
+ get_num_mds(STATE_REPLAY) == 0 &&
+ get_num_mds(STATE_FAILED) == 0;
}
+
int get_state(int m) {
if (mds_state.count(m)) return mds_state[m];
return STATE_OUT;
+
// ==========================================================
-// IMPORT/EXPORT
+// mds failure handling
+
+void Migrator::handle_mds_failure(int who)
+{
+ dout(5) << "handle_mds_failure mds" << who << endl;
+
+ // check my exports
+ map<CDir*,int>::iterator p = export_state.begin();
+ while (p != export_state.end()) {
+ map<CDir*,int>::iterator next = p;
+ next++;
+ CDir *dir = p->first;
+
+ if (export_peer[dir] == who) {
+ // the guy i'm exporting to failed.
+ // clean up.
+ dout(10) << "cleaning up export state " << p->second << " of " << *dir << endl;
+
+ switch (p->second) {
+ case EXPORT_DISCOVERING:
+ dout(10) << "state discovering : canceling freeze and removing auth_pin" << endl;
+ dir->unfreeze_tree(); // cancel the freeze
+ dir->auth_unpin(); // remove the auth_pin (that was holding up the freeze)
+ break;
+
+ case EXPORT_FREEZING:
+ dout(10) << "state freezing : canceling freeze" << endl;
+ dir->unfreeze_tree(); // cancel the freeze
+ break;
+
+ case EXPORT_LOGGINGSTART:
+ case EXPORT_PREPPING:
+ dout(10) << "state loggingstart|prepping : logging EExportFinish(false)" << endl;
+ mds->mdlog->submit_entry(new EExportFinish(dir,false));
+ // logger will unfreeze.
+ break;
+
+ case EXPORT_EXPORTING:
+ dout(10) << "state exporting : logging EExportFinish(false), reversing, and unfreezing" << endl;
+ mds->mdlog->submit_entry(new EExportFinish(dir,false));
+ reverse_export(dir);
+ dir->unfreeze_tree();
+ break;
+
+ case EXPORT_LOGGINGFINISH:
+ dout(10) << "state loggingfinish : doing nothing, we were successful." << endl;
+ break;
+
+ default:
+ assert(0);
+ }
+
+ export_state.erase(dir);
+ export_peer.erase(dir);
+
+ } else {
+ // third party failed. potential peripheral damage?
+ if (p->second == EXPORT_EXPORTING) {
+ // yeah, i'm waiting for acks, let's fake theirs.
+ if (export_notify_ack_waiting[dir].count(who)) {
+ dout(10) << "faking export_dir_notify_ack from mds" << who
+ << " on " << *dir << " to mds" << export_peer[dir]
+ << endl;
+ export_notify_ack_waiting[dir].erase(who);
+ if (export_notify_ack_waiting[dir].empty())
+ export_dir_acked(dir);
+ }
+ }
+ }
+
+ // next!
+ p = next;
+ }
+}
+
+
+
+
+
+
+// ==========================================================
+// EXPORT
class C_MDC_ExportFreeze : public Context {
C_MDC_ExportFreeze(Migrator *m, CDir *e, int d) :
mig(m), ex(e), dest(d) {}
virtual void finish(int r) {
- mig->export_dir_frozen(ex, dest);
+ if (r >= 0)
+ mig->export_dir_frozen(ex, dest);
}
};
}
// ok, let's go.
-
- exporting.insert(dir);
+ assert(export_state.count(dir) == 0);
+ export_state[dir] = EXPORT_DISCOVERING;
+ export_peer[dir] = dest;
// send ExportDirDiscover (ask target)
mds->send_message_mds(new MExportDirDiscover(dir->inode), dest, MDS_PORT_MIGRATOR);
dout(7) << "export_dir_discover_ack from " << m->get_source()
<< " on " << *dir << ", releasing auth_pin" << endl;
+
+ export_state[dir] = EXPORT_FREEZING;
+
dir->auth_unpin(); // unpin to allow freeze to complete
delete m; // done
{
// subtree is now frozen!
dout(7) << "export_dir_frozen on " << *dir << " to " << dest << endl;
+ export_state[dir] = EXPORT_LOGGINGSTART;
show_imports();
void Migrator::export_dir_frozen_logged(CDir *dir, MExportDirPrep *prep, int dest)
{
dout(7) << "export_dir_frozen_logged " << *dir << endl;
+
+ if (export_state.count(dir) == 0 ||
+ export_state[dir] != EXPORT_LOGGINGSTART) {
+ // export must have aborted.
+ dout(7) << "export must have aborted, unfreezing and deleting me old prep message" << endl;
+ delete prep;
+ dir->unfreeze_tree(); // cancel the freeze
+ return;
+ }
+
+ export_state[dir] = EXPORT_PREPPING;
mds->send_message_mds(prep, dest, MDS_PORT_MIGRATOR);
}
dout(7) << "export_dir_prep_ack " << *dir << ", starting export" << endl;
+ if (export_state.count(dir) == 0 ||
+ export_state[dir] != EXPORT_PREPPING) {
+ // export must have aborted.
+ dout(7) << "export must have aborted, unfreezing" << endl;
+ dir->unfreeze_tree();
+ return;
+ }
+
// start export.
+ export_state[dir] = EXPORT_EXPORTING;
export_dir_go(dir, m->get_source().num());
// done
// update imports/exports
CDir *containing_import = cache->get_auth_container(dir);
+ set<CDir*> bounds;
+
if (containing_import == dir) {
dout(7) << " i'm rexporting a previous import" << endl;
assert(dir->is_import());
// add to export message
req->add_export(nested);
+ bounds.insert(nested);
// nested beneath our new export *in; remove!
dout(7) << " export " << *nested << " was nested beneath us; removing from export list(s)" << endl;
// add to msg
req->add_export(nested);
+ bounds.insert(nested);
} else {
dout(12) << " export " << *nested << " is under other export " << *containing_export << ", which is unrelated" << endl;
assert(cache->get_auth_container(containing_export) != containing_import);
}
}
+
// note new authority (locally)
if (dir->inode->authority() == dest)
dir->set_dir_auth( CDIR_AUTH_PARENT );
else
dir->set_dir_auth( dest );
+ // note bounds
+ export_bounds[dir].swap(bounds);
+
+
// make list of nodes i expect an export_dir_notify_ack from
// (everyone w/ this dir open, but me!)
assert(export_notify_ack_waiting[dir].empty());
assert(export_notify_ack_waiting[dir].count( dest ));
// fill export message with cache data
- C_Contexts *fin = new C_Contexts;
+ C_Contexts *fin = new C_Contexts; // collect all the waiters
int num_exported_inodes = export_dir_walk( req,
fin,
dir, // base
assert(export_notify_ack_waiting[dir].count(from));
export_notify_ack_waiting[dir].erase(from);
+ dout(7) << "handle_export_dir_notify_ack on " << *dir << " from " << from
+ << ", still need (" << export_notify_ack_waiting[dir] << ")" << endl;
+
// done?
if (export_notify_ack_waiting[dir].empty()) {
- export_notify_ack_waiting.erase(dir);
-
+ export_dir_acked(dir);
+ } else {
dout(7) << "handle_export_dir_notify_ack on " << *dir << " from " << from
- << ", last one!" << endl;
+ << ", still waiting for " << export_notify_ack_waiting[dir] << endl;
+ }
+
+ delete m;
+}
+
- // log export completion, then finish (unfreeze, trigger finish context, etc.)
- mds->mdlog->submit_entry(new EExportFinish(dir, true),
- new C_MDS_ExportFinishLogged(this, dir));
+/*
+ * this happens if hte dest failes after i send teh export data but before it is acked
+ * that is, we don't know they safely received and logged it, so we reverse our changes
+ * and go on.
+ */
+void Migrator::reverse_export(CDir *dir)
+{
+ dout(7) << "reverse_export " << *dir << endl;
+
+ assert(export_state[dir] == EXPORT_EXPORTING);
+ assert(export_bounds.count(dir));
+
+ set<CDir*> bounds;
+ bounds.swap(export_bounds[dir]);
+ export_bounds.erase(dir);
+
+ // -- adjust dir_auth --
+ // base
+ CDir *im = dir;
+ if (dir->get_inode()->authority() == mds->get_nodeid()) {
+ // parent is already me. adding to existing import.
+ im = mds->mdcache->get_auth_container(dir);
+ assert(im);
+ mds->mdcache->nested_exports[im].erase(dir);
+ dir->set_dir_auth( CDIR_AUTH_PARENT );
+ dir->state_set(CDIR_STATE_EXPORT);
+ dir->get(CDir::PIN_EXPORT);
} else {
- dout(7) << "handle_export_dir_notify_ack on " << *dir << " from " << from
- << ", still waiting for " << export_notify_ack_waiting[dir] << endl;
+ // parent isn't me. new import.
+ mds->mdcache->imports.insert(dir);
+ dir->set_dir_auth( mds->get_nodeid() );
+ dir->state_set(CDIR_STATE_IMPORT);
+ dir->get(CDir::PIN_IMPORT);
}
- delete m;
+ dout(10) << " base " << *dir << endl;
+ if (dir != im)
+ dout(10) << " under " << *im << endl;
+
+ // bounds
+ for (set<CDir*>::iterator p = bounds.begin();
+ p != bounds.end();
+ ++p) {
+ CDir *bd = *p;
+
+ if (bd->get_dir_auth() == mds->get_nodeid()) {
+ // still me. was an import.
+ mds->mdcache->imports.erase(bd);
+ bd->set_dir_auth( CDIR_AUTH_PARENT );
+ bd->state_clear(CDIR_STATE_IMPORT);
+ bd->put(CDir::PIN_IMPORT);
+ // move nested exports.
+ for (set<CDir*>::iterator q = mds->mdcache->nested_exports[bd].begin();
+ q != mds->mdcache->nested_exports[bd].end();
+ ++q)
+ mds->mdcache->nested_exports[im].insert(*q);
+ mds->mdcache->nested_exports.erase(bd);
+ } else {
+ // not me anymore. now an export.
+ mds->mdcache->exports.insert(bd);
+ mds->mdcache->nested_exports[im].insert(bd);
+ assert(bd->get_dir_auth() != CDIR_AUTH_PARENT);
+ bd->set_dir_auth( CDIR_AUTH_UNKNOWN );
+ bd->state_set(CDIR_STATE_EXPORT);
+ bd->get(CDir::PIN_EXPORT);
+ }
+
+ dout(10) << " bound " << *bd << endl;
+ }
+
+
+ // -- adjust auth bits --
+
+
+
+
}
+void Migrator::export_dir_acked(CDir *dir)
+{
+ dout(7) << "export_dir_acked " << *dir << endl;
+ export_notify_ack_waiting.erase(dir);
+
+ export_state[dir] = EXPORT_LOGGINGFINISH;
+ export_bounds.erase(dir);
+
+ // log export completion, then finish (unfreeze, trigger finish context, etc.)
+ mds->mdlog->submit_entry(new EExportFinish(dir, true),
+ new C_MDS_ExportFinishLogged(this, dir));
+}
+
/*
* once i get all teh notify_acks i can finish
*/
void Migrator::export_dir_finish(CDir *dir)
{
- // send finish/commit to new auth
- mds->send_message_mds(new MExportDirFinish(dir->ino()), dir->authority(), MDS_PORT_MIGRATOR);
-
- // remove from exporting list
- exporting.erase(dir);
-
+ dout(7) << "export_dir_finish " << *dir << endl;
+
+ if (export_state.count(dir)) {
+ // send finish/commit to new auth
+ mds->send_message_mds(new MExportDirFinish(dir->ino()), dir->authority(), MDS_PORT_MIGRATOR);
+
+ // remove from exporting list
+ export_state.erase(dir);
+ export_peer.erase(dir);
+ } else {
+ dout(7) << "target must have failed, not sending final commit message. export succeeded anyway." << endl;
+ }
+
// unfreeze
- dout(7) << "export_dir_finish " << *dir << ", unfreezing" << endl;
+ dout(7) << "export_dir_finish unfreezing" << endl;
dir->unfreeze_tree();
-
+
// unpin path
dout(7) << "export_dir_finish unpinning path" << endl;
vector<CDentry*> trace;
+// ==========================================================
+// IMPORT
-// IMPORTS
-
class C_MDC_ExportDirDiscover : public Context {
Migrator *mig;
MExportDirDiscover *m;
// pin auth too, until the import completes.
in->auth_pin();
+
+ import_state[in->ino()] = IMPORT_DISCOVERED;
+
// reply
dout(7) << " sending export_dir_discover_ack on " << *in << endl;
// auth pin too
dir->auth_pin();
diri->auth_unpin();
+
+ // change import state
+ import_state[diri->ino()] = IMPORT_PREPPING;
// assimilate traces to exports
for (list<CInodeDiscover*>::iterator it = m->get_inodes().begin();
CInode *in = cache->get_inode(*it);
assert(in);
+ // note bound.
+ import_bounds[dir->ino()].insert(*it);
+
if (!in->dir) {
dout(7) << " opening nested export on " << *in << endl;
cache->open_remote_dir(in,
dout(7) << " all ready, sending export_dir_prep_ack on " << *dir << endl;
mds->send_message_mds(new MExportDirPrepAck(dir->ino()),
m->get_source().num(), MDS_PORT_MIGRATOR);
-
+
+ // note new state
+ import_state[diri->ino()] = IMPORT_PREPPED;
+
// done
delete m;
}
// mark export point frozenleaf
ex->get(CDir::PIN_FREEZELEAF);
ex->state_set(CDIR_STATE_FROZENTREELEAF);
- import_freeze_leaves[dir->ino()].insert(ex); // and take note!
-
+ assert(import_bounds[dir->ino()].count(*it)); // we took note during prep stage
+
// remove our pin
ex->put(CDir::PIN_IMPORTINGEXPORT);
ex->state_clear(CDIR_STATE_IMPORTINGEXPORT);
// adjust popularity
mds->balancer->add_import(dir);
+ dout(7) << "handle_export_dir did " << *dir << endl;
+
// log it
mds->mdlog->submit_entry(le,
new C_MDS_ImportDirLoggedStart(this, dir, m->get_source().num(),
imported_subdirs, m->get_exports()));
+ // note state
+ import_state[dir->ino()] = IMPORT_LOGGINGSTART;
+
// some stats
if (mds->logger) {
mds->logger->inc("im");
{
dout(7) << "import_dir_logged " << *dir << endl;
- assert(0); // test die
+ // note state
+ import_state[dir->ino()] = IMPORT_ACKING;
// send notify's etc.
dout(7) << "sending notifyack for " << *dir << " to old auth mds" << from << endl;
dout(7) << "handle_export_dir_finish logging import_finish on " << *dir << endl;
assert(dir->is_auth());
+ // note state
+ import_state[dir->ino()] = IMPORT_LOGGINGFINISH;
+
// log
mds->mdlog->submit_entry(new EImportFinish(dir, true),
new C_MDS_ImportDirLoggedFinish(this,dir));
void Migrator::import_dir_logged_finish(CDir *dir)
{
- dout(7) << "import_dir_finish" << endl;
-
- dout(5) << "done with import of " << *dir << endl;
- show_imports();
- if (mds->logger) {
- mds->logger->set("nex", cache->exports.size());
- mds->logger->set("nim", cache->imports.size());
- }
+ dout(7) << "import_dir_logged_finish " << *dir << endl;
// un auth pin (other exports can now proceed)
dir->auth_unpin();
// unfreeze!
- for (set<CDir*>::iterator p = import_freeze_leaves[dir->ino()].begin();
- p != import_freeze_leaves[dir->ino()].end();
+ for (set<inodeno_t>::iterator p = import_bounds[dir->ino()].begin();
+ p != import_bounds[dir->ino()].end();
++p) {
- (*p)->put(CDir::PIN_FREEZELEAF);
- (*p)->state_clear(CDIR_STATE_FROZENTREELEAF);
+ CInode *diri = mds->mdcache->get_inode(*p);
+ CDir *dir = diri->dir;
+ assert(dir->state_test(CDIR_STATE_FROZENTREELEAF));
+ dir->put(CDir::PIN_FREEZELEAF);
+ dir->state_clear(CDIR_STATE_FROZENTREELEAF);
}
- import_freeze_leaves.erase(dir->ino());
dir->unfreeze_tree();
+ // clear import state (we're done!)
+ import_state.erase(dir->ino());
+ import_bounds.erase(dir->ino());
// ok now finish contexts
dout(5) << "finishing any waiters on imported data" << endl;
dir->finish_waiting(CDIR_WAIT_IMPORTED);
+ // log it
+ if (mds->logger) {
+ mds->logger->set("nex", cache->exports.size());
+ mds->logger->set("nim", cache->imports.size());
+ }
+ show_imports();
// is it empty?
if (dir->get_size() == 0 &&
// add to journal entry
le->metablob.add_dir(dir, true); // Hmm: false would be okay in some cases
+ int num_imported = 0;
+
if (dir->is_hashed()) {
// do nothing; dir is hashed
- return 0;
} else {
// take all waiters on this dir
// NOTE: a pass of imported data is guaranteed to get all of my waiters because
dout(15) << "doing contents" << endl;
// contents
- int num_imported = 0;
long nden = dstate.get_nden();
for (; nden>0; nden--) {
le->metablob.add_dentry(dn, true); // Hmm: might we do dn->is_dirty() here instead?
}
- return num_imported;
}
+
+ dout(7) << " import_dir_block done " << *dir << endl;
+ return num_imported;
}
MDS *mds;
MDCache *cache;
+ // -- exports --
+ // export stages. used to clean up intelligently if there's a failure.
+ const static int EXPORT_DISCOVERING = 1; // dest is disovering export dir
+ const static int EXPORT_FREEZING = 2; // we're freezing the dir tree
+ const static int EXPORT_LOGGINGSTART = 3; // we're logging EExportStart
+ const static int EXPORT_PREPPING = 4; // sending dest spanning tree to export bounds
+ const static int EXPORT_EXPORTING = 5; // sent actual export, waiting for acks
+ const static int EXPORT_LOGGINGFINISH = 6; // logging EExportFinish
+
// export fun
- set<CDir*> exporting;
- map<CDir*, set<int> > export_notify_ack_waiting; // nodes i am waiting to get export_notify_ack's from
+ map<CDir*, int> export_state;
+ map<CDir*, int> export_peer;
+ map<CDir*, set<CDir*> > export_bounds;
+ map<CDir*, set<int> > export_notify_ack_waiting; // nodes i am waiting to get export_notify_ack's from
map<CDir*, list<inodeno_t> > export_proxy_inos;
map<CDir*, list<inodeno_t> > export_proxy_dirinos;
set<inodeno_t> stray_export_warnings; // notifies i haven't seen
map<inodeno_t, MExportDirNotify*> stray_export_notifies;
- // import muck
- map<inodeno_t, set<CDir*> > import_freeze_leaves;
- // hashing madness
+ // -- imports --
+ const static int IMPORT_DISCOVERED = 1; // waiting for prep
+ const static int IMPORT_PREPPING = 2; // opening dirs on bounds
+ const static int IMPORT_PREPPED = 3; // opened bounds, waiting for import
+ const static int IMPORT_LOGGINGSTART = 3; // got import, logging EImportStart
+ const static int IMPORT_ACKING = 4; // logged, sent acks
+ const static int IMPORT_LOGGINGFINISH = 5;
+
+ map<inodeno_t,int> import_state;
+ map<inodeno_t,set<inodeno_t> > import_bounds;
+
+
+ // -- hashing madness --
multimap<CDir*, int> unhash_waiting; // nodes i am waiting for UnhashDirAck's from
multimap<inodeno_t, inodeno_t> import_hashed_replicate_waiting; // nodes i am waiting to discover to complete my import of a hashed dir
// maps frozen_dir_ino's to waiting-for-discover ino's.
multimap<inodeno_t, inodeno_t> import_hashed_frozen_waiting; // dirs i froze (for the above)
+
+
public:
// -- cons --
Migrator(MDS *m, MDCache *c) : mds(m), cache(c) {}
void dispatch(Message*);
- //bool is_importing();
- bool is_exporting(CDir *dir = 0) {
- if (dir)
- return exporting.count(dir);
- else
- return !exporting.empty();
+
+ // -- status --
+ int is_exporting(CDir *dir) {
+ if (export_state.count(dir)) return export_state[dir];
+ return 0;
+ }
+ bool is_exporting() { return !export_state.empty(); }
+ int is_importing(inodeno_t dirino) {
+ if (import_state.count(dirino)) return import_state[dirino];
+ return 0;
+ }
+ bool is_importing() { return !import_state.empty(); }
+ const set<inodeno_t>& get_import_bounds(inodeno_t base) {
+ assert(import_bounds.count(base));
+ return import_bounds[base];
}
+
+ // -- misc --
+ void handle_mds_failure(int who);
+ void show_imports();
+
+
// -- import/export --
// exporter
public:
CDir *basedir,
CDir *dir,
int newauth);
- void export_dir_finish(CDir *dir);
void handle_export_dir_notify_ack(MExportDirNotifyAck *m);
-
+ void reverse_export(CDir *dir);
+ void export_dir_acked(CDir *dir);
+ void export_dir_finish(CDir *dir);
+
friend class C_MDC_ExportFreeze;
friend class C_MDC_ExportStartLogged;
friend class C_MDS_ExportFinishLogged;
void handle_export_dir_warning(MExportDirWarning *m);
void handle_export_dir_notify(MExportDirNotify *m);
- void show_imports();
// -- hashed directories --
#include "include/types.h"
+// sent from replica to auth
class MMDSCacheRejoin : public Message {
-
- struct InodeState {
- int hardlock; // hardlock state
- int filelock; // filelock state
- int caps_wanted; // what caps bits i want
- InodeState(int cw=0, int hl=-1, int fl=-1) : hardlock(hl), filelock(fl), caps_wanted(cw) {}
- };
-
- map<inodeno_t, InodeState> inodes;
- map<inodeno_t, map<string, int> > dentries;
+ public:
+ map<inodeno_t,int> inodes; // ino -> caps_wanted
set<inodeno_t> dirs;
+ map<inodeno_t, set<string> > dentries; // dir -> (dentries...)
- public:
MMDSCacheRejoin() : Message(MSG_MDS_CACHEREJOIN) {}
char *get_type_name() { return "cache_rejoin"; }
}
void add_dir(inodeno_t dirino) {
- dirs.insert(dirino);
+ dirs.insert(dirino);
}
- void add_dentry(inodeno_t dirino, const string& dn, int ls) {
- dentries[dirino][dn] = ls;
+ void add_dentry(inodeno_t dirino, const string& dn) {
+ dentries[dirino].insert(dn);
}
- void add_inode(inodeno_t ino, int hl, int fl, int cw) {
- inodes[ino] = InodeState(cw,hl,fl);
+ void add_inode(inodeno_t ino, int cw) {
+ inodes[ino] = cw;
}
void encode_payload() {
- ::_encode(inodes, payload);
- ::_encode(dirs, payload);
+ ::_encode(inodes, payload);
+ ::_encode(dirs, payload);
+ for (set<inodeno_t>::iterator p = dirs.begin(); p != dirs.end(); ++p)
+ ::_encode(dentries[*p], payload);
}
void decode_payload() {
- int off = 0;
- ::_decode(inodes, payload, off);
- ::_decode(dirs, payload, off);
+ int off = 0;
+ ::_decode(inodes, payload, off);
+ ::_decode(dirs, payload, off);
+ for (set<inodeno_t>::iterator p = dirs.begin(); p != dirs.end(); ++p)
+ ::_decode(dentries[*p], payload, off);
}
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MMDSCACHEREJOINACK_H
+#define __MMDSCACHEREJOINACK_H
+
+#include "msg/Message.h"
+
+#include "include/types.h"
+
+// sent from auth back to replica
+
+class MMDSCacheRejoinAck : public Message {
+ public:
+ struct inodeinfo {
+ inodeno_t ino;
+ int hardlock;
+ int filelock;
+ int nonce;
+ inodeinfo() {}
+ inodeinfo(inodeno_t i, int h, int f, int n) : ino(i), hardlock(h), filelock(f), nonce(n) {}
+ };
+ struct dninfo {
+ int lock;
+ int nonce;
+ dninfo() {}
+ dninfo(int l, int n) : lock(l), nonce(n) {}
+ };
+ struct dirinfo {
+ inodeno_t dirino;
+ int nonce;
+ dirinfo() {}
+ dirinfo(inodeno_t i, int n) : dirino(i), nonce(n) {}
+ };
+ list<inodeinfo> inodes;
+ map<inodeno_t, map<string,dninfo> > dentries;
+ list<dirinfo> dirs;
+
+ MMDSCacheRejoinAck() : Message(MSG_MDS_CACHEREJOINACK) {}
+
+ char *get_type_name() { return "cache_rejoin_ack"; }
+
+ void print(ostream& out) {
+ out << "cache_rejoin" << endl;
+ }
+
+ void add_dir(inodeno_t dirino, int nonce) {
+ dirs.push_back(dirinfo(dirino,nonce));
+ }
+ void add_dentry(inodeno_t dirino, const string& dn, int ls, int nonce) {
+ dentries[dirino][dn] = dninfo(ls, nonce);
+ }
+ void add_inode(inodeno_t ino, int hl, int fl, int nonce) {
+ inodes.push_back(inodeinfo(ino, hl, fl, nonce));
+ }
+
+ void encode_payload() {
+ ::_encode(inodes, payload);
+ ::_encode(dirs, payload);
+ for (list<dirinfo>::iterator p = dirs.begin(); p != dirs.end(); ++p)
+ ::_encode(dentries[p->dirino], payload);
+ }
+ void decode_payload() {
+ int off = 0;
+ ::_decode(inodes, payload, off);
+ ::_decode(dirs, payload, off);
+ for (list<dirinfo>::iterator p = dirs.begin(); p != dirs.end(); ++p)
+ ::_decode(dentries[p->dirino], payload, off);
+ }
+};
+
+#endif
imap[im].insert(ex);
}
- void add_ambiguous_import(inodeno_t im, set<inodeno_t>& m) {
+ void add_ambiguous_import(inodeno_t im, const set<inodeno_t>& m) {
ambiguous_imap[im] = m;
}
break;
case MDSMap::STATE_STANDBY:
- if (mdsmap.is_created(*p))
+ if (mdsmap.has_created(*p))
newstate = MDSMap::STATE_OUT;
else
newstate = MDSMap::STATE_DNE;
#include "messages/MMDSBeacon.h"
#include "messages/MMDSImportMap.h"
#include "messages/MMDSCacheRejoin.h"
+#include "messages/MMDSCacheRejoinAck.h"
#include "messages/MDirUpdate.h"
#include "messages/MDiscover.h"
case MSG_MDS_CACHEREJOIN:
m = new MMDSCacheRejoin;
break;
+ case MSG_MDS_CACHEREJOINACK:
+ m = new MMDSCacheRejoinAck;
+ break;
case MSG_MDS_DIRUPDATE:
m = new MDirUpdate();
#define MSG_MDS_IMPORTMAP 106
#define MSG_MDS_CACHEREJOIN 107
+#define MSG_MDS_CACHEREJOINACK 108
#define MSG_MDS_DISCOVER 110
#define MSG_MDS_DISCOVERREPLY 111