void Migrator::export_try_cancel(CDir *dir)
{
- int state = export_state[dir];
+ map<CDir*,export_state_t>::iterator it = export_state.find(dir);
+ assert(it != export_state.end());
+
+ int state = it->second.state;
switch (state) {
case EXPORT_DISCOVERING:
dout(10) << "export state=discovering : canceling freeze and removing auth_pin" << dendl;
+ it->second.state = EXPORT_CANCELLED;
dir->unfreeze_tree(); // cancel the freeze
dir->auth_unpin(this);
- export_state.erase(dir); // clean up
export_unlock(dir);
- export_locks.erase(dir);
export_freeze_finish(dir);
dir->state_clear(CDir::STATE_EXPORTING);
- if (mds->mdsmap->is_clientreplay_or_active_or_stopping(export_peer[dir])) // tell them.
- mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir]);
+ if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) // tell them.
+ mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), it->second.peer);
break;
case EXPORT_FREEZING:
dout(10) << "export state=freezing : canceling freeze" << dendl;
+ it->second.state = EXPORT_CANCELLED;
dir->unfreeze_tree(); // cancel the freeze
- export_state.erase(dir); // clean up
export_freeze_finish(dir);
dir->state_clear(CDir::STATE_EXPORTING);
- if (mds->mdsmap->is_clientreplay_or_active_or_stopping(export_peer[dir])) // tell them.
- mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir]);
+ if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) // tell them.
+ mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), it->second.peer);
break;
// NOTE: state order reversal, warning comes after prepping
case EXPORT_PREPPING:
if (state != EXPORT_WARNING)
dout(10) << "export state=prepping : unpinning bounds, unfreezing" << dendl;
+
+ it->second.state = EXPORT_CANCELLED;
{
// unpin bounds
set<CDir*> bounds;
export_notify_abort(dir, bounds);
}
dir->unfreeze_tree();
- export_state.erase(dir); // clean up
cache->adjust_subtree_auth(dir, mds->get_nodeid());
cache->try_subtree_merge(dir); // NOTE: this may journal subtree_map as side effect
export_unlock(dir);
- export_locks.erase(dir);
dir->state_clear(CDir::STATE_EXPORTING);
- if (mds->mdsmap->is_clientreplay_or_active_or_stopping(export_peer[dir])) // tell them.
- mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir]);
+ if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) // tell them.
+ mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), it->second.peer);
break;
case EXPORT_EXPORTING:
dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
+ it->second.state = EXPORT_CANCELLED;
export_reverse(dir);
- export_state.erase(dir); // clean up
- export_locks.erase(dir);
dir->state_clear(CDir::STATE_EXPORTING);
break;
}
// finish clean-up?
- if (export_state.count(dir) == 0) {
- export_peer.erase(dir);
- export_warning_ack_waiting.erase(dir);
- export_notify_ack_waiting.erase(dir);
-
+ if (it->second.state == EXPORT_CANCELLED) {
// wake up any waiters
- mds->queue_waiters(export_finish_waiters[dir]);
- export_finish_waiters.erase(dir);
+ mds->queue_waiters(it->second.waiting_for_finish);
+
+ export_state.erase(it);
// send pending import_maps? (these need to go out when all exports have finished.)
cache->maybe_send_pending_resolves();
// freeze. this way no freeze completions run before we want them
// to.
list<CDir*> pinned_dirs;
- for (map<CDir*,int>::iterator p = export_state.begin();
+ for (map<CDir*,export_state_t>::iterator p = export_state.begin();
p != export_state.end();
++p) {
- if (p->second == EXPORT_FREEZING) {
+ if (p->second.state == EXPORT_FREEZING) {
CDir *dir = p->first;
dout(10) << "adding temp auth_pin on freezing " << *dir << dendl;
dir->auth_pin(this);
}
}
- map<CDir*,int>::iterator p = export_state.begin();
+ map<CDir*,export_state_t>::iterator p = export_state.begin();
while (p != export_state.end()) {
- map<CDir*,int>::iterator next = p;
+ map<CDir*,export_state_t>::iterator next = p;
++next;
CDir *dir = p->first;
// - that are going to the failed node
// - that aren't frozen yet (to avoid auth_pin deadlock)
// - they havne't prepped yet (they may need to discover bounds to do that)
- if (export_peer[dir] == who ||
- p->second == EXPORT_DISCOVERING ||
- p->second == EXPORT_FREEZING ||
- p->second == EXPORT_PREPPING) {
+ if (p->second.peer == who ||
+ p->second.state == EXPORT_DISCOVERING ||
+ p->second.state == EXPORT_FREEZING ||
+ p->second.state == EXPORT_PREPPING) {
// the guy i'm exporting to failed, or we're just freezing.
- dout(10) << "cleaning up export state (" << p->second << ")" << get_export_statename(p->second)
- << " of " << *dir << dendl;
+ dout(10) << "cleaning up export state (" << p->second.state << ")"
+ << get_export_statename(p->second.state) << " of " << *dir << dendl;
export_try_cancel(dir);
} else {
// bystander failed.
- if (export_warning_ack_waiting.count(dir) &&
- export_warning_ack_waiting[dir].count(who)) {
- export_warning_ack_waiting[dir].erase(who);
- export_notify_ack_waiting[dir].erase(who); // they won't get a notify either.
- if (p->second == EXPORT_WARNING) {
+ if (p->second.warning_ack_waiting.count(who)) {
+ p->second.warning_ack_waiting.erase(who);
+ p->second.notify_ack_waiting.erase(who); // they won't get a notify either.
+ if (p->second.state == EXPORT_WARNING) {
// exporter waiting for warning acks, let's fake theirs.
dout(10) << "faking export_warning_ack from mds." << who
- << " on " << *dir << " to mds." << export_peer[dir]
+ << " on " << *dir << " to mds." << p->second.peer
<< dendl;
- if (export_warning_ack_waiting[dir].empty())
+ if (p->second.warning_ack_waiting.empty())
export_go(dir);
}
}
- if (export_notify_ack_waiting.count(dir) &&
- export_notify_ack_waiting[dir].count(who)) {
- export_notify_ack_waiting[dir].erase(who);
- if (p->second == EXPORT_NOTIFYING) {
+ if (p->second.notify_ack_waiting.count(who)) {
+ p->second.notify_ack_waiting.erase(who);
+ if (p->second.state == EXPORT_NOTIFYING) {
// exporter is waiting for notify acks, fake it
dout(10) << "faking export_notify_ack from mds." << who
- << " on " << *dir << " to mds." << export_peer[dir]
+ << " on " << *dir << " to mds." << p->second.peer
<< dendl;
- if (export_notify_ack_waiting[dir].empty())
+ if (p->second.notify_ack_waiting.empty())
export_finish(dir);
}
}
// check my imports
- map<dirfrag_t,int>::iterator q = import_state.begin();
+ map<dirfrag_t,import_state_t>::iterator q = import_state.begin();
while (q != import_state.end()) {
- map<dirfrag_t,int>::iterator next = q;
+ map<dirfrag_t,import_state_t>::iterator next = q;
++next;
dirfrag_t df = q->first;
CInode *diri = mds->mdcache->get_inode(df.ino);
CDir *dir = mds->mdcache->get_dirfrag(df);
- if (import_peer[df] == who) {
+ if (q->second.peer == who) {
if (dir)
- dout(10) << "cleaning up import state (" << q->second << ")" << get_import_statename(q->second)
- << " of " << *dir << dendl;
+ dout(10) << "cleaning up import state (" << q->second.state << ")"
+ << get_import_statename(q->second.state) << " of " << *dir << dendl;
else
- dout(10) << "cleaning up import state (" << q->second << ")" << get_import_statename(q->second)
- << " of " << df << dendl;
+ dout(10) << "cleaning up import state (" << q->second.state << ")"
+ << get_import_statename(q->second.state) << " of " << df << dendl;
- switch (q->second) {
+ switch (q->second.state) {
case IMPORT_DISCOVERING:
dout(10) << "import state=discovering : clearing state" << dendl;
import_reverse_discovering(df);
import_remove_pins(dir, bounds);
// adjust auth back to the exporter
- cache->adjust_subtree_auth(dir, import_peer[df]);
+ cache->adjust_subtree_auth(dir, q->second.peer);
cache->try_subtree_merge(dir); // NOTE: may journal subtree_map as side-effect
// bystanders?
- if (import_bystanders[dir].empty()) {
+ if (q->second.bystanders.empty()) {
import_reverse_unfreeze(dir);
} else {
// notify them; wait in aborting state
import_notify_abort(dir, bounds);
- import_state[df] = IMPORT_ABORTING;
+ import_state[df].state = IMPORT_ABORTING;
assert(g_conf->mds_kill_import_at != 10);
}
}
break;
}
} else {
- if (q->second == IMPORT_ABORTING &&
- import_bystanders[dir].count(who)) {
+ if (q->second.state == IMPORT_ABORTING &&
+ q->second.bystanders.count(who)) {
assert(dir);
dout(10) << "faking export_notify_ack from mds." << who
- << " on aborting import " << *dir << " from mds." << import_peer[df]
+ << " on aborting import " << *dir << " from mds." << q->second.peer
<< dendl;
- import_bystanders[dir].erase(who);
- if (import_bystanders[dir].empty()) {
- import_bystanders.erase(dir);
+ q->second.bystanders.erase(who);
+ if (q->second.bystanders.empty()) {
import_reverse_unfreeze(dir);
}
}
void Migrator::show_importing()
{
dout(10) << "show_importing" << dendl;
- for (map<dirfrag_t,int>::iterator p = import_state.begin();
+ for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
p != import_state.end();
++p) {
CDir *dir = mds->mdcache->get_dirfrag(p->first);
if (dir) {
- dout(10) << " importing from " << import_peer[p->first]
- << ": (" << p->second << ") " << get_import_statename(p->second)
- << " " << p->first
- << " " << *dir
- << dendl;
+ dout(10) << " importing from " << p->second.peer
+ << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
+ << " " << p->first << " " << *dir << dendl;
} else {
- dout(10) << " importing from " << import_peer[p->first]
- << ": (" << p->second << ") " << get_import_statename(p->second)
- << " " << p->first
- << dendl;
+ dout(10) << " importing from " << p->second.peer
+ << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
+ << " " << p->first << dendl;
}
}
}
void Migrator::show_exporting()
{
dout(10) << "show_exporting" << dendl;
- for (map<CDir*,int>::iterator p = export_state.begin();
+ for (map<CDir*,export_state_t>::iterator p = export_state.begin();
p != export_state.end();
++p)
- dout(10) << " exporting to " << export_peer[p->first]
- << ": (" << p->second << ") " << get_export_statename(p->second)
- << " " << p->first->dirfrag()
- << " " << *p->first
- << dendl;
+ dout(10) << " exporting to " << p->second.peer
+ << ": (" << p->second.state << ") " << get_export_statename(p->second.state)
+ << " " << p->first->dirfrag() << " " << *p->first << dendl;
}
// import_state
show_importing();
- for (map<dirfrag_t,int>::iterator p = import_state.begin();
+ for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
p != import_state.end();
++p) {
- if (p->second == IMPORT_DISCOVERING)
+ if (p->second.state == IMPORT_DISCOVERING)
continue;
- if (p->second == IMPORT_DISCOVERED) {
+ if (p->second.state == IMPORT_DISCOVERED) {
CInode *in = cache->get_inode(p->first.ino);
assert(in);
continue;
}
CDir *dir = cache->get_dirfrag(p->first);
assert(dir);
- if (p->second == IMPORT_PREPPING)
+ if (p->second.state == IMPORT_PREPPING)
continue;
- if (p->second == IMPORT_ABORTING) {
+ if (p->second.state == IMPORT_ABORTING) {
assert(!dir->is_ambiguous_dir_auth());
assert(dir->get_dir_auth().first != mds->get_nodeid());
continue;
// export_state
show_exporting();
- for (map<CDir*,int>::iterator p = export_state.begin();
+ for (map<CDir*,export_state_t>::iterator p = export_state.begin();
p != export_state.end();
++p) {
CDir *dir = p->first;
- if (p->second == EXPORT_DISCOVERING ||
- p->second == EXPORT_FREEZING) continue;
+ if (p->second.state == EXPORT_DISCOVERING ||
+ p->second.state == EXPORT_FREEZING) continue;
assert(dir->is_ambiguous_dir_auth());
assert(dir->authority().first == mds->get_nodeid() ||
dir->authority().second == mds->get_nodeid());
dout(7) << "export_dir can't rdlock needed locks, failing." << dendl;
return;
}
- mds->locker->rdlock_take_set(locks);
- export_locks[dir].swap(locks);
// ok.
+ mds->locker->rdlock_take_set(locks);
+
assert(export_state.count(dir) == 0);
- export_state[dir] = EXPORT_DISCOVERING;
- export_peer[dir] = dest;
+ export_state_t& stat = export_state[dir];
+ stat.state = EXPORT_DISCOVERING;
+ stat.peer = dest;
+ stat.locks.swap(locks);
dir->state_set(CDir::STATE_EXPORTING);
assert(g_conf->mds_kill_export_at != 1);
dout(7) << "export_discover_ack from " << m->get_source()
<< " on " << *dir << dendl;
- if (export_state.count(dir) == 0 ||
- export_state[dir] != EXPORT_DISCOVERING ||
- export_peer[dir] != m->get_source().num()) {
+ map<CDir*,export_state_t>::iterator it = export_state.find(dir);
+ if (it == export_state.end() ||
+ it->second.state != EXPORT_DISCOVERING ||
+ it->second.peer != m->get_source().num()) {
dout(7) << "must have aborted" << dendl;
} else {
// release locks to avoid deadlock
export_unlock(dir);
- export_locks.erase(dir);
// freeze the subtree
- export_state[dir] = EXPORT_FREEZING;
+ it->second.state = EXPORT_FREEZING;
dir->auth_unpin(this);
assert(g_conf->mds_kill_export_at != 3);
}
assert(dir->is_frozen());
assert(dir->get_cum_auth_pins() == 0);
+ map<CDir*,export_state_t>::iterator it = export_state.find(dir);
+ assert(it != export_state.end());
+ assert(it->second.state == EXPORT_FREEZING);
+
export_freeze_finish(dir);
- int dest = export_peer[dir];
CInode *diri = dir->inode;
// ok, try to grab all my locks.
<< *diri << dendl;
// .. unwind ..
- export_peer.erase(dir);
- export_state.erase(dir);
dir->unfreeze_tree();
dir->state_clear(CDir::STATE_EXPORTING);
+ mds->queue_waiters(it->second.waiting_for_finish);
- mds->queue_waiters(export_finish_waiters[dir]);
- export_finish_waiters.erase(dir);
+ mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), it->second.peer);
- mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), dest);
+ export_state.erase(it);
return;
}
+
mds->locker->rdlock_take_set(locks);
- export_locks[dir].swap(locks);
+ it->second.locks.swap(locks);
cache->show_subtrees();
for (map<int,int>::iterator p = dir->replicas_begin();
p != dir->replicas_end();
++p) {
- if (p->first != dest) {
+ if (p->first != it->second.peer) {
dout(10) << "bystander mds." << p->first << dendl;
prep->add_bystander(p->first);
}
}
// include base dirfrag
- cache->replicate_dir(dir, dest, prep->basedir);
+ cache->replicate_dir(dir, it->second.peer, prep->basedir);
/*
* include spanning tree for all nested exports.
// prepend dentry + inode
assert(cur->inode->is_auth());
bufferlist bl;
- cache->replicate_dentry(cur->inode->parent, dest, bl);
+ cache->replicate_dentry(cur->inode->parent, it->second.peer, bl);
dout(7) << " added " << *cur->inode->parent << dendl;
- cache->replicate_inode(cur->inode, dest, bl);
+ cache->replicate_inode(cur->inode, it->second.peer, bl);
dout(7) << " added " << *cur->inode << dendl;
bl.claim_append(tracebl);
tracebl.claim(bl);
dirfrags_added.insert(cur->dirfrag());
// prepend dir
- cache->replicate_dir(cur, dest, bl);
+ cache->replicate_dir(cur, it->second.peer, bl);
dout(7) << " added " << *cur << dendl;
bl.claim_append(tracebl);
tracebl.claim(bl);
}
// send.
- export_state[dir] = EXPORT_PREPPING;
- mds->send_message_mds(prep, dest);
+ it->second.state = EXPORT_PREPPING;
+ mds->send_message_mds(prep, it->second.peer);
assert (g_conf->mds_kill_export_at != 4);
}
dout(7) << "export_prep_ack " << *dir << dendl;
- if (export_state.count(dir) == 0 ||
- export_state[dir] != EXPORT_PREPPING) {
+ map<CDir*,export_state_t>::iterator it = export_state.find(dir);
+ if (it == export_state.end() ||
+ it->second.state != EXPORT_PREPPING) {
// export must have aborted.
dout(7) << "export must have aborted" << dendl;
m->put();
assert (g_conf->mds_kill_export_at != 5);
// send warnings
- int dest = export_peer[dir];
set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
- assert(export_peer.count(dir));
- assert(export_warning_ack_waiting.count(dir) == 0);
- assert(export_notify_ack_waiting.count(dir) == 0);
+ assert(it->second.warning_ack_waiting.empty());
+ assert(it->second.notify_ack_waiting.empty());
for (map<int,int>::iterator p = dir->replicas_begin();
p != dir->replicas_end();
++p) {
- if (p->first == dest) continue;
+ if (p->first == it->second.peer) continue;
if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first))
continue; // only if active
- export_warning_ack_waiting[dir].insert(p->first);
- export_notify_ack_waiting[dir].insert(p->first); // we'll eventually get a notifyack, too!
+ it->second.warning_ack_waiting.insert(p->first);
+ it->second.notify_ack_waiting.insert(p->first); // we'll eventually get a notifyack, too!
MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), true,
pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
- pair<int,int>(mds->get_nodeid(),export_peer[dir]));
- for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
- notify->get_bounds().push_back((*i)->dirfrag());
+ pair<int,int>(mds->get_nodeid(),it->second.peer));
+ for (set<CDir*>::iterator q = bounds.begin(); q != bounds.end(); ++q)
+ notify->get_bounds().push_back((*q)->dirfrag());
mds->send_message_mds(notify, p->first);
}
- export_state[dir] = EXPORT_WARNING;
+
+ it->second.state = EXPORT_WARNING;
assert(g_conf->mds_kill_export_at != 6);
// nobody to warn?
- if (export_warning_ack_waiting.count(dir) == 0)
+ if (it->second.warning_ack_waiting.empty())
export_go(dir); // start export.
// done.
void Migrator::export_go(CDir *dir)
{
- assert(export_peer.count(dir));
- int dest = export_peer[dir];
- dout(7) << "export_go " << *dir << " to " << dest << dendl;
+ assert(export_state.count(dir));
+ dout(7) << "export_go " << *dir << " to " << export_state[dir].peer << dendl;
// first sync log to flush out e.g. any cap imports
mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir));
}
void Migrator::export_go_synced(CDir *dir)
-{
- if (export_state.count(dir) == 0 ||
- export_state[dir] != EXPORT_WARNING) {
+{
+
+ map<CDir*,export_state_t>::iterator it = export_state.find(dir);
+ if (it == export_state.end() ||
+ it->second.state != EXPORT_WARNING) {
// export must have aborted.
dout(7) << "export must have aborted on " << dir << dendl;
return;
}
- assert(export_peer.count(dir));
- int dest = export_peer[dir];
- dout(7) << "export_go_synced " << *dir << " to " << dest << dendl;
+ dout(7) << "export_go_synced " << *dir << " to " << it->second.peer << dendl;
cache->show_subtrees();
- export_warning_ack_waiting.erase(dir);
- export_state[dir] = EXPORT_EXPORTING;
+ it->second.state = EXPORT_EXPORTING;
assert(g_conf->mds_kill_export_at != 7);
assert(dir->get_cum_auth_pins() == 0);
// set ambiguous auth
- cache->adjust_subtree_auth(dir, mds->get_nodeid(), dest);
+ cache->adjust_subtree_auth(dir, mds->get_nodeid(), it->second.peer);
// take away the popularity we're sending.
utime_t now = ceph_clock_now(g_ceph_context);
req->add_export((*p)->dirfrag());
// send
- mds->send_message_mds(req, dest);
+ mds->send_message_mds(req, it->second.peer);
assert(g_conf->mds_kill_export_at != 8);
// stats
// yay!
dout(7) << "handle_export_ack " << *dir << dendl;
- export_warning_ack_waiting.erase(dir);
-
- export_state[dir] = EXPORT_LOGGINGFINISH;
+ map<CDir*,export_state_t>::iterator it = export_state.find(dir);
+ assert(it != export_state.end());
+ assert(it->second.state == EXPORT_EXPORTING);
+
+ it->second.state = EXPORT_LOGGINGFINISH;
assert (g_conf->mds_kill_export_at != 9);
set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
// list us second, them first.
// this keeps authority().first in sync with subtree auth state in the journal.
- int target = export_peer[dir];
- cache->adjust_subtree_auth(dir, target, mds->get_nodeid());
+ cache->adjust_subtree_auth(dir, it->second.peer, mds->get_nodeid());
// log completion.
// include export bounds, to ensure they're in the journal.
{
dout(7) << "export_notify_abort " << *dir << dendl;
- for (set<int>::iterator p = export_notify_ack_waiting[dir].begin();
- p != export_notify_ack_waiting[dir].end();
+ export_state_t& stat = export_state[dir];
+
+ for (set<int>::iterator p = stat.notify_ack_waiting.begin();
+ p != stat.notify_ack_waiting.end();
++p) {
MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), false,
- pair<int,int>(mds->get_nodeid(),export_peer[dir]),
+ pair<int,int>(mds->get_nodeid(),stat.peer),
pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN));
for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
notify->get_bounds().push_back((*i)->dirfrag());
void Migrator::export_reverse(CDir *dir)
{
dout(7) << "export_reverse " << *dir << dendl;
-
- assert(export_state[dir] == EXPORT_EXPORTING);
-
+
set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
// process delayed expires
cache->process_delayed_expire(dir);
- // some clean up
- export_warning_ack_waiting.erase(dir);
- export_notify_ack_waiting.erase(dir);
-
// unfreeze
dir->unfreeze_tree();
{
dout(7) << "export_logged_finish " << *dir << dendl;
- // send notifies
- int dest = export_peer[dir];
+ export_state_t& stat = export_state[dir];
+ // send notifies
set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
- for (set<int>::iterator p = export_notify_ack_waiting[dir].begin();
- p != export_notify_ack_waiting[dir].end();
+ for (set<int>::iterator p = stat.notify_ack_waiting.begin();
+ p != stat.notify_ack_waiting.end();
++p) {
MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), true,
- pair<int,int>(mds->get_nodeid(), dest),
- pair<int,int>(dest, CDIR_AUTH_UNKNOWN));
+ pair<int,int>(mds->get_nodeid(), stat.peer),
+ pair<int,int>(stat.peer, CDIR_AUTH_UNKNOWN));
for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
notify->get_bounds().push_back((*i)->dirfrag());
}
// wait for notifyacks
- export_state[dir] = EXPORT_NOTIFYING;
+ stat.state = EXPORT_NOTIFYING;
assert (g_conf->mds_kill_export_at != 11);
// no notifies to wait for?
- if (export_notify_ack_waiting[dir].empty())
+ if (stat.notify_ack_waiting.empty())
export_finish(dir); // skip notify/notify_ack stage.
}
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
assert(dir);
int from = m->get_source().num();
-
- if (export_state.count(dir) && export_state[dir] == EXPORT_WARNING) {
- // exporting. process warning.
- dout(7) << "handle_export_notify_ack from " << m->get_source()
- << ": exporting, processing warning on "
- << *dir << dendl;
- assert(export_warning_ack_waiting.count(dir));
- export_warning_ack_waiting[dir].erase(from);
-
- if (export_warning_ack_waiting[dir].empty())
- export_go(dir); // start export.
- }
- else if (export_state.count(dir) && export_state[dir] == EXPORT_NOTIFYING) {
- // exporting. process notify.
- dout(7) << "handle_export_notify_ack from " << m->get_source()
- << ": exporting, processing notify on "
- << *dir << dendl;
- assert(export_notify_ack_waiting.count(dir));
- export_notify_ack_waiting[dir].erase(from);
-
- if (export_notify_ack_waiting[dir].empty())
- export_finish(dir);
- }
- else if (import_state.count(dir->dirfrag()) && import_state[dir->dirfrag()] == IMPORT_ABORTING) {
- // reversing import
- dout(7) << "handle_export_notify_ack from " << m->get_source()
- << ": aborting import on "
- << *dir << dendl;
- assert(import_bystanders[dir].count(from));
- import_bystanders[dir].erase(from);
- if (import_bystanders[dir].empty()) {
- import_bystanders.erase(dir);
- import_reverse_unfreeze(dir);
+
+ if (export_state.count(dir)) {
+ export_state_t& stat = export_state[dir];
+ if (stat.state == EXPORT_WARNING) {
+ // exporting. process warning.
+ dout(7) << "handle_export_notify_ack from " << m->get_source()
+ << ": exporting, processing warning on " << *dir << dendl;
+ stat.warning_ack_waiting.erase(from);
+
+ if (stat.warning_ack_waiting.empty())
+ export_go(dir); // start export.
+ } else if (stat.state == EXPORT_NOTIFYING) {
+ // exporting. process notify.
+ dout(7) << "handle_export_notify_ack from " << m->get_source()
+ << ": exporting, processing notify on " << *dir << dendl;
+ stat.notify_ack_waiting.erase(from);
+
+ if (stat.notify_ack_waiting.empty())
+ export_finish(dir);
+ }
+ }
+ else if (import_state.count(dir->dirfrag())) {
+ import_state_t& stat = import_state[dir->dirfrag()];
+ if (stat.state == IMPORT_ABORTING) {
+ // reversing import
+ dout(7) << "handle_export_notify_ack from " << m->get_source()
+ << ": aborting import on " << *dir << dendl;
+ assert(stat.bystanders.count(from));
+ stat.bystanders.erase(from);
+ if (stat.bystanders.empty())
+ import_reverse_unfreeze(dir);
}
}
{
dout(10) << "export_unlock " << *dir << dendl;
- mds->locker->rdlock_finish_set(export_locks[dir]);
+ mds->locker->rdlock_finish_set(export_state[dir].locks);
+ export_state[dir].locks.clear();
list<Context*> ls;
mds->queue_waiters(ls);
dout(5) << "export_finish " << *dir << dendl;
assert (g_conf->mds_kill_export_at != 12);
- if (export_state.count(dir) == 0) {
+ map<CDir*,export_state_t>::iterator it = export_state.find(dir);
+ if (it == export_state.end()) {
dout(7) << "target must have failed, not sending final commit message. export succeeded anyway." << dendl;
return;
}
// send finish/commit to new auth
- if (mds->mdsmap->is_clientreplay_or_active_or_stopping(export_peer[dir])) {
- mds->send_message_mds(new MExportDirFinish(dir->dirfrag()), export_peer[dir]);
+ if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) {
+ mds->send_message_mds(new MExportDirFinish(dir->dirfrag()), it->second.peer);
} else {
dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
}
// adjust auth, with possible subtree merge.
// (we do this _after_ removing EXPORTBOUND pins, to allow merges)
- cache->adjust_subtree_auth(dir, export_peer[dir]);
+ cache->adjust_subtree_auth(dir, it->second.peer);
cache->try_subtree_merge(dir); // NOTE: may journal subtree_map as sideeffect
// no more auth subtree? clear scatter dirty
// remove from exporting list, clean up state
dir->state_clear(CDir::STATE_EXPORTING);
- export_state.erase(dir);
- export_locks.erase(dir);
- export_peer.erase(dir);
- export_notify_ack_waiting.erase(dir);
// queue finishers
- mds->queue_waiters(export_finish_waiters[dir]);
- export_finish_waiters.erase(dir);
+ mds->queue_waiters(it->second.waiting_for_finish);
+
+ export_state.erase(it);
cache->show_subtrees();
audit();
if (!m->started) {
m->started = true;
import_pending_msg[df] = m;
- import_state[df] = IMPORT_DISCOVERING;
- import_peer[df] = from;
+ import_state[df].state = IMPORT_DISCOVERING;
+ import_state[df].peer = from;
} else {
// am i retrying after ancient path_traverse results?
if (import_pending_msg.count(df) == 0 || import_pending_msg[df] != m) {
// yay
dout(7) << "handle_export_discover have " << df << " inode " << *in << dendl;
- import_state[m->get_dirfrag()] = IMPORT_DISCOVERED;
+ import_state[m->get_dirfrag()].state = IMPORT_DISCOVERED;
import_pending_msg.erase(m->get_dirfrag());
// pin inode in the cache (for now)
// reply
dout(7) << " sending export_discover_ack on " << *in << dendl;
- mds->send_message_mds(new MExportDirDiscoverAck(df), import_peer[df]);
+ mds->send_message_mds(new MExportDirDiscoverAck(df), import_state[df].peer);
m->put();
assert (g_conf->mds_kill_import_at != 2);
}
{
import_pending_msg.erase(df);
import_state.erase(df);
- import_peer.erase(df);
}
void Migrator::import_reverse_discovered(dirfrag_t df, CInode *diri)
// unpin base
diri->put(CInode::PIN_IMPORTING);
import_state.erase(df);
- import_peer.erase(df);
}
void Migrator::import_reverse_prepping(CDir *dir)
{
import_pending_msg.erase(dir->dirfrag());
set<CDir*> bounds;
- cache->map_dirfrag_set(import_bound_ls[dir], bounds);
+ cache->map_dirfrag_set(import_state[dir->dirfrag()].bound_ls, bounds);
import_remove_pins(dir, bounds);
import_reverse_final(dir);
}
{
dout(7) << "handle_export_cancel on " << m->get_dirfrag() << dendl;
dirfrag_t df = m->get_dirfrag();
- if (import_state[df] == IMPORT_DISCOVERING) {
+ map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
+ if (it == import_state.end()) {
+ assert(0 == "got export_cancel in weird state");
+ } else if (it->second.state == IMPORT_DISCOVERING) {
import_reverse_discovering(df);
- } else if (import_state[df] == IMPORT_DISCOVERED) {
+ } else if (it->second.state == IMPORT_DISCOVERED) {
CInode *in = cache->get_inode(df.ino);
assert(in);
import_reverse_discovered(df, in);
- } else if (import_state[df] == IMPORT_PREPPING) {
+ } else if (it->second.state == IMPORT_PREPPING) {
CDir *dir = mds->mdcache->get_dirfrag(df);
assert(dir);
import_reverse_prepping(dir);
- } else if (import_state[df] == IMPORT_PREPPED) {
+ } else if (it->second.state == IMPORT_PREPPED) {
CDir *dir = mds->mdcache->get_dirfrag(df);
assert(dir);
set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
import_remove_pins(dir, bounds);
// adjust auth back to the exportor
- cache->adjust_subtree_auth(dir, import_peer[df]);
+ cache->adjust_subtree_auth(dir, it->second.peer);
cache->try_subtree_merge(dir);
import_reverse_unfreeze(dir);
} else {
list<Context*> finished;
// assimilate root dir.
+ map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
if (!m->did_assim()) {
+ assert(it != import_state.end());
diri = cache->get_inode(m->get_dirfrag().ino);
assert(diri);
bufferlist::iterator p = m->basedir.begin();
import_pending_msg[dir->dirfrag()] = m;
// change import state
- import_state[dir->dirfrag()] = IMPORT_PREPPING;
- import_bound_ls[dir] = m->get_bounds();
+ it->second.state = IMPORT_PREPPING;
+ it->second.bound_ls = m->get_bounds();
+ it->second.bystanders = m->get_bystanders();
assert(g_conf->mds_kill_import_at != 3);
+ // bystander list
+ dout(7) << "bystanders are " << it->second.bystanders << dendl;
+
// move pin to dir
diri->put(CInode::PIN_IMPORTING);
dir->get(CDir::PIN_IMPORTING);
dir->state_set(CDir::STATE_IMPORTING);
-
- // bystander list
- import_bystanders[dir] = m->get_bystanders();
- dout(7) << "bystanders are " << import_bystanders[dir] << dendl;
// assimilate traces to exports
// each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
mds->send_message(new MExportDirPrepAck(dir->dirfrag()), m->get_connection());
// note new state
- import_state[dir->dirfrag()] = IMPORT_PREPPED;
+ it->second.state = IMPORT_PREPPED;
import_pending_msg.erase(dir->dirfrag());
assert(g_conf->mds_kill_import_at != 4);
// done
dir, // import root
le,
mds->mdlog->get_current_segment(),
- import_caps[dir],
- import_updated_scatterlocks[dir],
+ import_state[dir->dirfrag()].peer_exports,
+ import_state[dir->dirfrag()].updated_scatterlocks,
now);
}
dout(10) << " " << m->bounds.size() << " imported bounds" << dendl;
dout(7) << "handle_export_dir did " << *dir << dendl;
// note state
- import_state[dir->dirfrag()] = IMPORT_LOGGINGSTART;
+ import_state[dir->dirfrag()].state = IMPORT_LOGGINGSTART;
assert (g_conf->mds_kill_import_at != 6);
// log it
*/
void Migrator::import_remove_pins(CDir *dir, set<CDir*>& bounds)
{
+ import_state_t& stat = import_state[dir->dirfrag()];
// root
dir->put(CDir::PIN_IMPORTING);
dir->state_clear(CDir::STATE_IMPORTING);
// bounding inodes
set<inodeno_t> did;
- for (list<dirfrag_t>::iterator p = import_bound_ls[dir].begin();
- p != import_bound_ls[dir].end();
+ for (list<dirfrag_t>::iterator p = stat.bound_ls.begin();
+ p != stat.bound_ls.end();
++p) {
if (did.count(p->ino))
continue;
in->put_stickydirs();
}
- if (import_state[dir->dirfrag()] >= IMPORT_PREPPED) {
+ if (stat.state >= IMPORT_PREPPED) {
// bounding dirfrags
for (set<CDir*>::iterator it = bounds.begin();
it != bounds.end();
{
dout(7) << "import_reverse " << *dir << dendl;
+ import_state_t& stat = import_state[dir->dirfrag()];
+
set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
assert(dir->is_subtree_root());
if (mds->is_resolve())
cache->trim_non_auth_subtree(dir);
- cache->adjust_subtree_auth(dir, import_peer[dir->dirfrag()]);
+
+ cache->adjust_subtree_auth(dir, stat.peer);
if (!dir->get_inode()->is_auth() &&
!dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid()))
}
// reexport caps
- for (map<CInode*, map<client_t,Capability::Export> >::iterator p = import_caps[dir].begin();
- p != import_caps[dir].end();
+ for (map<CInode*, map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
+ p != stat.peer_exports.end();
++p) {
CInode *in = p->first;
dout(20) << " reexporting caps on " << *in << dendl;
cache->try_subtree_merge(dir); // NOTE: this may journal subtree map as side effect
// bystanders?
- if (import_bystanders[dir].empty()) {
+ if (stat.bystanders.empty()) {
dout(7) << "no bystanders, finishing reverse now" << dendl;
import_reverse_unfreeze(dir);
} else {
// notify them; wait in aborting state
dout(7) << "notifying bystanders of abort" << dendl;
import_notify_abort(dir, bounds);
- import_state[dir->dirfrag()] = IMPORT_ABORTING;
+ stat.state = IMPORT_ABORTING;
assert (g_conf->mds_kill_import_at != 10);
}
}
{
dout(7) << "import_notify_finish " << *dir << dendl;
- for (set<int>::iterator p = import_bystanders[dir].begin();
- p != import_bystanders[dir].end();
+ import_state_t& stat = import_state[dir->dirfrag()];
+ for (set<int>::iterator p = stat.bystanders.begin();
+ p != stat.bystanders.end();
++p) {
MExportDirNotify *notify =
new MExportDirNotify(dir->dirfrag(), false,
- pair<int,int>(import_peer[dir->dirfrag()], mds->get_nodeid()),
+ pair<int,int>(stat.peer, mds->get_nodeid()),
pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
notify->get_bounds().push_back((*i)->dirfrag());
{
dout(7) << "import_notify_abort " << *dir << dendl;
- for (set<int>::iterator p = import_bystanders[dir].begin();
- p != import_bystanders[dir].end();
+ import_state_t& stat = import_state[dir->dirfrag()];
+ for (set<int>::iterator p = stat.bystanders.begin();
+ p != stat.bystanders.end();
++p) {
MExportDirNotify *notify =
new MExportDirNotify(dir->dirfrag(), true,
- pair<int,int>(import_peer[dir->dirfrag()], mds->get_nodeid()),
- pair<int,int>(import_peer[dir->dirfrag()], CDIR_AUTH_UNKNOWN));
+ pair<int,int>(stat.peer, mds->get_nodeid()),
+ pair<int,int>(stat.peer, CDIR_AUTH_UNKNOWN));
for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
notify->get_bounds().push_back((*i)->dirfrag());
mds->send_message_mds(notify, *p);
// clean up
import_state.erase(dir->dirfrag());
- import_peer.erase(dir->dirfrag());
- import_bystanders.erase(dir);
- import_bound_ls.erase(dir);
- import_updated_scatterlocks.erase(dir);
- import_caps.erase(dir);
// send pending import_maps?
mds->mdcache->maybe_send_pending_resolves();
map<client_t,entity_inst_t>& imported_client_map,
map<client_t,uint64_t>& sseqmap)
{
- if (import_state.count(df) == 0 ||
- import_state[df] != IMPORT_LOGGINGSTART) {
+ map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
+ if (it == import_state.end() ||
+ it->second.state != IMPORT_LOGGINGSTART) {
dout(7) << "import " << df << " must have aborted" << dendl;
return;
}
dout(7) << "import_logged " << *dir << dendl;
// note state
- import_state[dir->dirfrag()] = IMPORT_ACKING;
+ import_state[dir->dirfrag()].state = IMPORT_ACKING;
assert (g_conf->mds_kill_import_at != 7);
// force open client sessions and finish cap import
mds->server->finish_force_open_sessions(imported_client_map, sseqmap);
- for (map<CInode*, map<client_t,Capability::Export> >::iterator p = import_caps[dir].begin();
- p != import_caps[dir].end();
+ for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
+ p != it->second.peer_exports.end();
++p) {
finish_import_inode_caps(p->first, true, p->second);
}
import_remove_pins(dir, bounds);
- map<CInode*, map<client_t,Capability::Export> > cap_imports;
- import_caps[dir].swap(cap_imports);
+ map<CInode*, map<client_t,Capability::Export> > peer_exports;
+ import_state[dir->dirfrag()].peer_exports.swap(peer_exports);
// clear import state (we're done!)
import_state.erase(dir->dirfrag());
- import_peer.erase(dir->dirfrag());
- import_bystanders.erase(dir);
- import_bound_ls.erase(dir);
- import_caps.erase(dir);
- import_updated_scatterlocks.erase(dir);
mds->mdlog->start_submit_entry(new EImportFinish(dir, true));
mds->queue_waiters(ls);
// re-eval imported caps
- for (map<CInode*, map<client_t,Capability::Export> >::iterator p = cap_imports.begin();
- p != cap_imports.end();
+ for (map<CInode*, map<client_t,Capability::Export> >::iterator p = peer_exports.begin();
+ p != peer_exports.end();
++p)
if (p->first->is_auth())
mds->locker->eval(p->first, CEPH_CAP_LOCKS, true);
void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int oldauth,
LogSegment *ls, uint64_t log_offset,
- map<CInode*, map<client_t,Capability::Export> >& cap_imports,
+ map<CInode*, map<client_t,Capability::Export> >& peer_exports,
list<ScatterLock*>& updated_scatterlocks)
{
dout(15) << "decode_import_inode on " << *dn << dendl;
in->last_journaled = log_offset;
// caps
- decode_import_inode_caps(in, blp, cap_imports);
+ decode_import_inode_caps(in, blp, peer_exports);
// link before state -- or not! -sage
if (dn->get_linkage()->get_inode() != in) {
void Migrator::decode_import_inode_caps(CInode *in,
bufferlist::iterator &blp,
- map<CInode*, map<client_t,Capability::Export> >& cap_imports)
+ map<CInode*, map<client_t,Capability::Export> >& peer_exports)
{
map<client_t,Capability::Export> cap_map;
::decode(cap_map, blp);
::decode(in->get_mds_caps_wanted(), blp);
if (!cap_map.empty() || !in->get_mds_caps_wanted().empty()) {
- cap_imports[in].swap(cap_map);
+ peer_exports[in].swap(cap_map);
in->get(CInode::PIN_IMPORTINGCAPS);
}
}
CDir *import_root,
EImportStart *le,
LogSegment *ls,
- map<CInode*, map<client_t,Capability::Export> >& cap_imports,
+ map<CInode*,map<client_t,Capability::Export> >& peer_exports,
list<ScatterLock*>& updated_scatterlocks, utime_t now)
{
// set up dir
else if (icode == 'I') {
// inode
assert(le);
- decode_import_inode(dn, blp, oldauth, ls, le->get_start_off(), cap_imports, updated_scatterlocks);
+ decode_import_inode(dn, blp, oldauth, ls, le->get_start_off(), peer_exports, updated_scatterlocks);
}
// add dentry to journal entry
CInode *in;
int from;
public:
- map<CInode*, map<client_t,Capability::Export> > cap_imports;
+ map<CInode*, map<client_t,Capability::Export> > peer_exports;
map<client_t,entity_inst_t> client_map;
map<client_t,uint64_t> sseqmap;
C_M_LoggedImportCaps(Migrator *m, CInode *i, int f) : migrator(m), in(i), from(f) {}
void finish(int r) {
- migrator->logged_import_caps(in, from, cap_imports, client_map, sseqmap);
+ migrator->logged_import_caps(in, from, peer_exports, client_map, sseqmap);
}
};
// decode new caps
bufferlist::iterator blp = ex->cap_bl.begin();
- decode_import_inode_caps(in, blp, finish->cap_imports);
- assert(!finish->cap_imports.empty()); // thus, inode is pinned.
+ decode_import_inode_caps(in, blp, finish->peer_exports);
+ assert(!finish->peer_exports.empty()); // thus, inode is pinned.
// journal open client sessions
version_t pv = mds->server->prepare_force_open_sessions(finish->client_map, finish->sseqmap);
void Migrator::logged_import_caps(CInode *in,
int from,
- map<CInode*, map<client_t,Capability::Export> >& cap_imports,
+ map<CInode*, map<client_t,Capability::Export> >& peer_exports,
map<client_t,entity_inst_t>& client_map,
map<client_t,uint64_t>& sseqmap)
{
// force open client sessions and finish cap import
mds->server->finish_force_open_sessions(client_map, sseqmap);
- assert(cap_imports.count(in));
- finish_import_inode_caps(in, false, cap_imports[in]);
+ assert(peer_exports.count(in));
+ finish_import_inode_caps(in, false, peer_exports[in]);
mds->locker->eval(in, CEPH_CAP_LOCKS, true);
mds->send_message_mds(new MExportCapsAck(in->ino()), from);