CDir *root;
if (dir->ino() == 1) {
root = dir; // bootstrap hack.
- subtrees[root].clear();
+ if (subtrees.count(root) == 0)
+ subtrees[root].clear();
} else {
root = get_subtree_root(dir); // subtree root
}
if (bounds.count(*p) == 0) {
CDir *stray = *p;
dout(10) << " swallowing extra subtree at " << *stray << endl;
- assert(stray->auth_is_ambiguous());
adjust_subtree_auth(stray, auth);
try_subtree_merge_at(stray);
}
}
+void MDCache::handle_mds_failure(int who)
+{
+ dout(7) << "handle_mds_failure mds" << who << endl;
+
+ // adjust subtree auth
+ for (map<CDir*,set<CDir*> >::iterator p = subtrees.begin();
+ p != subtrees.end();
+ ++p) {
+ CDir *dir = p->first;
+ // only if we are a _bystander_.
+ if (dir->dir_auth.first == who &&
+ dir->dir_auth.second >= 0 &&
+ dir->dir_auth.second != mds->get_nodeid()) {
+ dout(7) << "disambiguating auth for " << *dir << endl;
+ adjust_subtree_auth(dir, dir->dir_auth.second);
+ try_subtree_merge(dir);
+ }
+ else if (dir->dir_auth.second == who &&
+ dir->dir_auth.first != mds->get_nodeid()) {
+ dout(7) << "disambiguating auth for " << *dir << endl;
+ adjust_subtree_auth(dir, dir->dir_auth.first);
+ try_subtree_merge(dir);
+ }
+ }
+
+ // tell the migrator too.
+ migrator->handle_mds_failure(who);
+
+ show_subtrees();
+}
+
+
+
/*
* during resolve state, we share import_maps to determine who
* is authoritative for which trees. we expect to get an import_map
}
}
- // note ambiguous imports too
- for (map<inodeno_t, list<inodeno_t> >::iterator pi = m->ambiguous_imap.begin();
- pi != m->ambiguous_imap.end();
- ++pi) {
- dout(10) << "noting ambiguous import on " << pi->first << " bounds " << pi->second << endl;
- other_ambiguous_imports[from][pi->first].swap( pi->second );
+ // note ambiguous imports too.. unless i'm already active
+ if (!mds->is_active() && !mds->is_stopping()) {
+ for (map<inodeno_t, list<inodeno_t> >::iterator pi = m->ambiguous_imap.begin();
+ pi != m->ambiguous_imap.end();
+ ++pi) {
+ dout(10) << "noting ambiguous import on " << pi->first << " bounds " << pi->second << endl;
+ other_ambiguous_imports[from][pi->first].swap( pi->second );
+ }
}
show_subtrees();
want,
true), // need this dir too
cur->authority().first, MDS_PORT_CACHE);
+ if (cur->authority().second >= 0)
+ mds->send_message_mds(new MDiscover(mds->get_nodeid(),
+ cur->ino(),
+ want,
+ true), // need this dir too
+ cur->authority().second, MDS_PORT_CACHE);
}
cur->add_waiter(CINODE_WAIT_DIR, ondelay);
if (onfinish) delete onfinish;
// MISS. don't have it.
- int dauth = cur->dir->dentry_authority( path[depth] ).first;
+ pair<int,int> dauth = cur->dir->dentry_authority( path[depth] );
dout(12) << "traverse: miss on dentry " << path[depth] << " dauth " << dauth << " in " << *cur->dir << endl;
- if (dauth == whoami) {
+ if (dauth.first == whoami) {
// dentry is mine.
if (cur->dir->is_complete()) {
// file not found
cur->ino(),
want,
false),
- dauth, MDS_PORT_CACHE);
+ dauth.first, MDS_PORT_CACHE);
+ if (dauth.second >= 0)
+ mds->send_message_mds(new MDiscover(mds->get_nodeid(),
+ cur->ino(),
+ want,
+ false),
+ dauth.second, MDS_PORT_CACHE);
+
if (mds->logger) mds->logger->inc("dis");
}
req->clear_payload(); // reencode!
}
- mds->send_message_mds(req, dauth, req->get_dest_port());
+ mds->send_message_mds(req, dauth.first, req->get_dest_port());
//show_imports();
if (mds->logger) mds->logger->inc("cfw");
export_state.erase(dir); // clean up
break;
+ // NOTE: state order reversal, warning comes after loggingstart+prepping
case EXPORT_WARNING:
dout(10) << "export state=warning : unpinning bounds, unfreezing, notifying" << endl;
- export_notify_abort(dir); // tell peers about abort
+ //export_notify_abort(dir); // tell peers about abort
// fall-thru
- case EXPORT_LOGGINGSTART:
+ //case EXPORT_LOGGINGSTART:
case EXPORT_PREPPING:
if (p->second != EXPORT_WARNING)
dout(10) << "export state=loggingstart|prepping : unpinning bounds, unfreezing" << endl;
if (import_peer[dirino] == who) {
switch (import_state[dirino]) {
case IMPORT_DISCOVERED:
- dout(10) << "import state=discovered : unpinning " << *diri << endl;
+ dout(10) << "import state=discovered : unpinning inode " << *diri << endl;
assert(diri);
// unpin base
diri->put(CInode::PIN_IMPORTING);
import_peer.erase(dirino);
break;
- // NOTE: state order reversal + fall-thru, pay attention.
+ case IMPORT_PREPPING:
+ if (import_state[dirino] == IMPORT_PREPPING) {
+ dout(10) << "import state=prepping : unpinning base+bounds " << *dir << endl;
+ }
+ assert(dir);
+ import_reverse_unpin(dir); // unpin
+ break;
case IMPORT_PREPPED:
- dout(10) << "import state=prepping : unpinning base+bounds, unfreezing, " << *dir << endl;
+ dout(10) << "import state=prepping : unpinning base+bounds, unfreezing " << *dir << endl;
assert(dir);
- // unfreeze
- dir->unfreeze_tree();
-
// adjust auth back to me
cache->adjust_subtree_auth(dir, import_peer[dirino]);
cache->try_subtree_merge(dir);
- // FIXME what about bystanders
-
- // fall-thru to unpin base+bounds
-
- case IMPORT_PREPPING:
- if (import_state[dirino] == IMPORT_PREPPING) {
- dout(10) << "import state=prepping : unpinning base+bounds " << *dir << endl;
+ // bystanders?
+ if (import_bystanders[dir].empty()) {
+ import_reverse_unfreeze(dir);
+ } else {
+ // notify them; wait in aborting state
+ import_notify_abort(dir);
+ import_state[dirino] = IMPORT_ABORTING;
}
- assert(dir);
-
- // unpin base
- dir->put(CDir::PIN_IMPORTING);
-
- // unpin bounds
- for (set<CDir*>::iterator it = import_bounds[dir].begin();
- it != import_bounds[dir].end();
- it++) {
- CDir *bd = *it;
- assert(bd->state_test(CDir::STATE_IMPORTBOUND));
- bd->state_clear(CDir::STATE_IMPORTBOUND);
- bd->put(CDir::PIN_IMPORTBOUND);
- }
-
- import_state.erase(dirino);
- import_peer.erase(dirino);
- import_bound_inos.erase(dirino);
- import_bounds.erase(dir);
break;
-
case IMPORT_LOGGINGSTART:
dout(10) << "import state=loggingstart : reversing import on " << *dir << endl;
import_reverse(dir);
-
- // FIXME what about bystanders
break;
case IMPORT_ACKING:
dout(10) << "import state=acking : noting ambiguous import " << *dir << endl;
cache->add_ambiguous_import(dir, import_bounds[dir]);
break;
+
+ case IMPORT_ABORTING:
+ dout(10) << "import state=aborting : ignoring repeat failure " << *dir << endl;
+ break;
}
}
assert(dir->is_frozen());
// ok!
- export_state[dir] = EXPORT_LOGGINGSTART;
+ //export_state[dir] = EXPORT_LOGGINGSTART;
cache->show_imports();
// generate prep message, log entry.
MExportDirPrep *prep = new MExportDirPrep(dir->inode);
+ // include list of bystanders
+ for (map<int,int>::iterator p = dir->replicas_begin();
+ p != dir->replicas_end();
+ p++) {
+ if (p->first != dest) {
+ dout(10) << "bystander mds" << p->first << endl;
+ prep->add_bystander(p->first);
+ }
+ }
+
// include spanning tree for all nested exports.
// these need to be on the destination _before_ the final export so that
// dir_auth updates on any nested exports are properly absorbed.
//mds->send_message_mds(new MExportDirWarning(dir->ino(), export_peer[dir]),
//p->first, MDS_PORT_MIGRATOR);
- MExportDirNotify *notify = new MExportDirNotify(dir->ino(),
- mds->get_nodeid(), export_peer[dir]);
+ MExportDirNotify *notify = new MExportDirNotify(dir->ino(), true,
+ pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
+ pair<int,int>(mds->get_nodeid(),export_peer[dir]));
notify->copy_exports(export_bounds[dir]);
mds->send_message_mds(notify, p->first, MDS_PORT_MIGRATOR);
cache->process_delayed_expire(dir);
// tell peers
- export_notify_abort(dir);
+ //export_notify_abort(dir);
// unfreeze
dir->unfreeze_tree();
cache->show_cache();
}
+/*
void Migrator::export_notify_abort(CDir* dir)
{
dout(10) << "export_notify_abort " << *dir << endl;
for (set<int>::iterator p = export_notify_ack_waiting[dir].begin();
p != export_notify_ack_waiting[dir].end();
++p) {
- MExportDirNotify *notify = new MExportDirNotify(dir->ino(),
- mds->get_nodeid(), CDIR_AUTH_UNKNOWN);
+ MExportDirNotify *notify = new MExportDirNotify(dir->ino(), false,
+ pair<int,int>(mds->get_nodeid(), export_peer[dir]),
+ pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
notify->copy_exports(export_bounds[dir]);
mds->send_message_mds(notify, *p, MDS_PORT_MIGRATOR);
}
}
+*/
/*
* once i get the ack, and logged the EExportFinish(true),
dout(7) << "export_logged_finish " << *dir << endl;
dir->put(CDir::PIN_LOGGINGEXPORTFINISH);
- if (mds->get_nodeid() == 0 && g_clock.now() > 20.0) assert(0); // hack fake death
-
-
- if (export_state.count(dir) == 0||
- export_state[dir] != EXPORT_LOGGINGFINISH) {
- assert(0); // this won't happen.
- dout(7) << "target must have failed, not sending final commit message. export succeeded anyway." << endl;
- return;
- }
-
cache->verify_subtree_bounds(dir, export_bounds[dir]);
// send notifies
for (set<int>::iterator p = export_notify_ack_waiting[dir].begin();
p != export_notify_ack_waiting[dir].end();
++p) {
- MExportDirNotify *notify = new MExportDirNotify(dir->ino(),
- dest, CDIR_AUTH_UNKNOWN);
+ MExportDirNotify *notify;
+ if (mds->mdsmap->is_active_or_stopping(export_peer[dir]))
+ // dest is still alive.
+ notify = new MExportDirNotify(dir->ino(), true,
+ pair<int,int>(mds->get_nodeid(), dest),
+ pair<int,int>(dest, CDIR_AUTH_UNKNOWN));
+ else
+ // dest is dead. bystanders will think i am only auth, as per mdcache->handle_mds_failure()
+ notify = new MExportDirNotify(dir->ino(), true,
+ pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN),
+ pair<int,int>(dest, CDIR_AUTH_UNKNOWN));
+
notify->copy_exports(export_bounds[dir]);
mds->send_message_mds(notify, *p, MDS_PORT_MIGRATOR);
CInode *in = cache->get_inode(m->get_ino());
CDir *dir = in ? in->dir : 0;
- if (dir) {
- dout(7) << "handle_export_notify_ack from " << m->get_source()
- << " on " << *dir << endl;
- } else {
- dout(7) << "handle_export_notify_ack from " << m->get_source()
- << " on dir " << m->get_ino() << endl;
- }
-
- // aborted?
- if (!dir ||
- export_state.count(dir) == 0 ||
- (export_state[dir] != EXPORT_NOTIFYING &&
- export_state[dir] != EXPORT_WARNING)) {
- assert(0); // this won't happen.
- dout(7) << "target must have failed, ignoring." << endl;
- delete m;
- return;
- }
-
+ assert(dir);
int from = m->get_source().num();
- if (export_state[dir] == EXPORT_WARNING) {
- // process warning.
+ if (export_state.count(dir) && export_state[dir] == EXPORT_WARNING) {
+ // exporting. process warning.
+ dout(7) << "handle_export_notify_ack from " << m->get_source()
+ << ": exporting, processing warning on "
+ << *dir << endl;
assert(export_warning_ack_waiting.count(dir));
export_warning_ack_waiting[dir].erase(from);
if (export_warning_ack_waiting[dir].empty())
export_go(dir); // start export.
- } else {
- // process notify.
+ }
+ else if (export_state.count(dir) && export_state[dir] == EXPORT_NOTIFYING) {
+ // exporting. process notify.
+ dout(7) << "handle_export_notify_ack from " << m->get_source()
+ << ": exporting, processing notify on "
+ << *dir << endl;
assert(export_notify_ack_waiting.count(dir));
export_notify_ack_waiting[dir].erase(from);
if (export_notify_ack_waiting[dir].empty())
export_finish(dir);
}
+ else if (import_state.count(dir->ino()) && import_state[dir->ino()] == IMPORT_ABORTING) {
+ // reversing import
+ dout(7) << "handle_export_notify_ack from " << m->get_source()
+ << ": aborting import on "
+ << *dir << endl;
+ assert(import_bystanders[dir].count(from));
+ import_bystanders[dir].erase(from);
+ if (import_bystanders[dir].empty()) {
+ import_bystanders.erase(dir);
+ import_reverse_unfreeze(dir);
+ }
+ }
delete m;
}
// change import state
import_state[diri->ino()] = IMPORT_PREPPING;
+ // bystander list
+ import_bystanders[dir] = m->get_bystanders();
+ dout(7) << "bystanders are " << import_bystanders[dir] << endl;
+
// assimilate traces to exports
for (list<CInodeDiscover*>::iterator it = m->get_inodes().begin();
it != m->get_inodes().end();
assert(dir->is_auth() == false);
cache->show_imports();
-
+
// start the journal entry
EImportStart *le = new EImportStart(dir->ino(), m->get_exports());
le->metablob.add_dir_context(dir);
{
dout(7) << "import_reverse " << *dir << endl;
- // remove importing pin
- dir->put(CDir::PIN_IMPORTING);
-
- // remove bound pins
- for (set<CDir*>::iterator it = import_bounds[dir].begin();
- it != import_bounds[dir].end();
- it++) {
- CDir *bd = *it;
- bd->put(CDir::PIN_IMPORTBOUND);
- bd->state_clear(CDir::STATE_IMPORTBOUND);
- }
-
// update auth, with possible subtree merge.
if (fix_dir_auth) {
assert(dir->is_subtree_root());
}
}
+ // log our failure
+ mds->mdlog->submit_entry(new EImportFinish(dir,false)); // log failure
+
+ // bystanders?
+ if (import_bystanders[dir].empty()) {
+ dout(7) << "no bystanders, finishing reverse now" << endl;
+ import_reverse_unfreeze(dir);
+ } else {
+ // notify them; wait in aborting state
+ dout(7) << "notifying bystanders of abort" << endl;
+ import_notify_abort(dir);
+ import_state[dir->ino()] = IMPORT_ABORTING;
+ }
+}
+
+void Migrator::import_notify_abort(CDir *dir)
+{
+ dout(7) << "import_notify_abort " << *dir << endl;
+
+ for (set<int>::iterator p = import_bystanders[dir].begin();
+ p != import_bystanders[dir].end();
+ ++p) {
+ // NOTE: the bystander will think i am _only_ auth, because they will have seen
+ // the exporter's failure and updated the subtree auth. see mdcache->handle_mds_failure().
+ MExportDirNotify *notify =
+ new MExportDirNotify(dir->ino(), true,
+ pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN),
+ pair<int,int>(import_peer[dir->ino()], CDIR_AUTH_UNKNOWN));
+ notify->copy_exports(import_bounds[dir]);
+ mds->send_message_mds(notify, *p, MDS_PORT_MIGRATOR);
+ }
+}
+
+void Migrator::import_reverse_unfreeze(CDir *dir)
+{
+ dout(7) << "import_reverse_unfreeze " << *dir << endl;
+
// unfreeze
dir->unfreeze_tree();
// discard expire crap
cache->discard_delayed_expire(dir);
+
+ import_reverse_unpin(dir);
+}
- // log our failure
- mds->mdlog->submit_entry(new EImportFinish(dir,false)); // log failure
+void Migrator::import_reverse_unpin(CDir *dir)
+{
+ dout(7) << "import_reverse_unpin " << *dir << endl;
+
+ // remove importing pin
+ dir->put(CDir::PIN_IMPORTING);
+
+ // remove bound pins
+ for (set<CDir*>::iterator it = import_bounds[dir].begin();
+ it != import_bounds[dir].end();
+ it++) {
+ CDir *bd = *it;
+ bd->put(CDir::PIN_IMPORTBOUND);
+ bd->state_clear(CDir::STATE_IMPORTBOUND);
+ }
// clean up
import_state.erase(dir->ino());
import_peer.erase(dir->ino());
import_bound_inos.erase(dir->ino());
import_bounds.erase(dir);
+ import_bystanders.erase(dir);
cache->show_subtrees();
cache->show_cache();
import_peer.erase(dir->ino());
import_bound_inos.erase(dir->ino());
import_bounds.erase(dir);
+ import_bystanders.erase(dir);
// process delayed expires
cache->process_delayed_expire(dir);
CDir *dir = cache->get_dir(m->get_ino());
int from = m->get_source().num();
- pair<int,int> auth = m->get_auth();
+ pair<int,int> old_auth = m->get_old_auth();
+ pair<int,int> new_auth = m->get_new_auth();
if (!dir) {
- dout(7) << "handle_export_notify " << auth
+ dout(7) << "handle_export_notify " << old_auth << " -> " << new_auth
<< " on missing dir " << m->get_ino() << endl;
+ } else if (dir->authority() != old_auth) {
+ dout(7) << "handle_export_notify old_auth was " << dir->authority()
+ << " != " << old_auth << " -> " << new_auth
+ << " on " << *dir << endl;
} else {
- dout(7) << "handle_export_notify " << auth
+ dout(7) << "handle_export_notify " << old_auth << " -> " << new_auth
<< " on " << *dir << endl;
// adjust auth
- cache->adjust_bounded_subtree_auth(dir, m->get_exports(), auth);
+ cache->adjust_bounded_subtree_auth(dir, m->get_exports(), new_auth);
// induce a merge?
cache->try_subtree_merge(dir);
}
// send ack
- if (auth.first == from &&
- auth.second == CDIR_AUTH_UNKNOWN) {
- // aborted. no ack.
- dout(7) << "handle_export_notify mds" << auth.first
- << " aborted export, not sending ack for "
- << *dir << endl;
- } else {
+ if (m->wants_ack()) {
mds->send_message_mds(new MExportDirNotifyAck(m->get_ino()),
from, MDS_PORT_MIGRATOR);
+ } else {
+ // aborted. no ack.
+ dout(7) << "handle_export_notify no ack requested" << endl;
}
delete m;