export_freeze_finish(dir);
dir->state_clear(CDir::STATE_EXPORTING);
if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) // tell them.
- mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), it->second.peer);
+ mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
break;
case EXPORT_FREEZING:
export_freeze_finish(dir);
dir->state_clear(CDir::STATE_EXPORTING);
if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) // tell them.
- mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), it->second.peer);
+ mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
break;
// NOTE: state order reversal, warning comes after prepping
export_unlock(dir);
dir->state_clear(CDir::STATE_EXPORTING);
if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) // tell them.
- mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), it->second.peer);
+ mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
break;
case EXPORT_EXPORTING:
export_state_t& stat = export_state[dir];
stat.state = EXPORT_DISCOVERING;
stat.peer = dest;
+ stat.tid = ++last_export_tid;
stat.locks.swap(locks);
dir->state_set(CDir::STATE_EXPORTING);
// send ExportDirDiscover (ask target)
filepath path;
dir->inode->make_path(path);
- mds->send_message_mds(new MExportDirDiscover(mds->get_nodeid(), path, dir->dirfrag()), dest);
+ MExportDirDiscover *discover = new MExportDirDiscover(dir->dirfrag(), path,
+ mds->get_nodeid(), stat.tid);
+ mds->send_message_mds(discover, dest);
assert(g_conf->mds_kill_export_at != 2);
// start the freeze, but hold it up with an auth_pin.
map<CDir*,export_state_t>::iterator it = export_state.find(dir);
if (it == export_state.end() ||
- it->second.state != EXPORT_DISCOVERING ||
+ it->second.tid != m->get_tid() ||
it->second.peer != m->get_source().num()) {
dout(7) << "must have aborted" << dendl;
} else {
+ assert(it->second.state == EXPORT_DISCOVERING);
// release locks to avoid deadlock
export_unlock(dir);
// freeze the subtree
dir->state_clear(CDir::STATE_EXPORTING);
mds->queue_waiters(it->second.waiting_for_finish);
- mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), it->second.peer);
+ mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
export_state.erase(it);
return;
cache->get_subtree_bounds(dir, bounds);
// generate prep message, log entry.
- MExportDirPrep *prep = new MExportDirPrep(dir->dirfrag());
+ MExportDirPrep *prep = new MExportDirPrep(dir->dirfrag(), it->second.tid);
// include list of bystanders
for (map<int,int>::iterator p = dir->replicas_begin();
set<dirfrag_t> dirfrags_added;
// check bounds
- for (set<CDir*>::iterator it = bounds.begin();
- it != bounds.end();
- ++it) {
- CDir *bound = *it;
+ for (set<CDir*>::iterator p = bounds.begin();
+ p != bounds.end();
+ ++p) {
+ CDir *bound = *p;
// pin it.
bound->get(CDir::PIN_EXPORTBOUND);
map<CDir*,export_state_t>::iterator it = export_state.find(dir);
if (it == export_state.end() ||
- it->second.state != EXPORT_PREPPING) {
+ it->second.tid != m->get_tid() ||
+ it->second.peer != m->get_source().num()) {
// export must have aborted.
dout(7) << "export must have aborted" << dendl;
m->put();
return;
}
+ assert(it->second.state == EXPORT_PREPPING);
assert (g_conf->mds_kill_export_at != 5);
// send warnings
set<CDir*> bounds;
it->second.warning_ack_waiting.insert(p->first);
it->second.notify_ack_waiting.insert(p->first); // we'll eventually get a notifyack, too!
- MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), true,
+ MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), it->second.tid, true,
pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
pair<int,int>(mds->get_nodeid(),it->second.peer));
for (set<CDir*>::iterator q = bounds.begin(); q != bounds.end(); ++q)
class C_M_ExportGo : public Context {
Migrator *migrator;
CDir *dir;
+ uint64_t tid;
public:
- C_M_ExportGo(Migrator *m, CDir *d) : migrator(m), dir(d) {}
+ C_M_ExportGo(Migrator *m, CDir *d, uint64_t t) :
+ migrator(m), dir(d), tid(t) {}
void finish(int r) {
- migrator->export_go_synced(dir);
+ migrator->export_go_synced(dir, tid);
}
};
dout(7) << "export_go " << *dir << " to " << export_state[dir].peer << dendl;
// first sync log to flush out e.g. any cap imports
- mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir));
+ mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, export_state[dir].tid));
mds->mdlog->flush();
}
-void Migrator::export_go_synced(CDir *dir)
+void Migrator::export_go_synced(CDir *dir, uint64_t tid)
{
map<CDir*,export_state_t>::iterator it = export_state.find(dir);
if (it == export_state.end() ||
- it->second.state != EXPORT_WARNING) {
+ it->second.tid != tid) {
// export must have aborted.
dout(7) << "export must have aborted on " << dir << dendl;
return;
}
+ assert(it->second.state == EXPORT_WARNING);
dout(7) << "export_go_synced " << *dir << " to " << it->second.peer << dendl;
mds->balancer->subtract_export(dir, now);
// fill export message with cache data
- MExportDir *req = new MExportDir(dir->dirfrag());
+ MExportDir *req = new MExportDir(dir->dirfrag(), it->second.tid);
map<client_t,entity_inst_t> exported_client_map;
int num_exported_inodes = encode_export_dir(req->export_data,
dir, // recur start point
map<CDir*,export_state_t>::iterator it = export_state.find(dir);
assert(it != export_state.end());
assert(it->second.state == EXPORT_EXPORTING);
+ assert(it->second.tid == m->get_tid());
it->second.state = EXPORT_LOGGINGFINISH;
assert (g_conf->mds_kill_export_at != 9);
for (set<int>::iterator p = stat.notify_ack_waiting.begin();
p != stat.notify_ack_waiting.end();
++p) {
- MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), false,
+ MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(),stat.tid,false,
pair<int,int>(mds->get_nodeid(),stat.peer),
pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN));
for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
for (set<int>::iterator p = stat.notify_ack_waiting.begin();
p != stat.notify_ack_waiting.end();
++p) {
- MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), true,
+ MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), stat.tid, true,
pair<int,int>(mds->get_nodeid(), stat.peer),
pair<int,int>(stat.peer, CDIR_AUTH_UNKNOWN));
// send finish/commit to new auth
if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) {
- mds->send_message_mds(new MExportDirFinish(dir->dirfrag()), it->second.peer);
+ mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), it->second.tid), it->second.peer);
} else {
dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
}
// note import state
dirfrag_t df = m->get_dirfrag();
// only start discovering on this message once.
+ map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
if (!m->started) {
+ assert(it == import_state.end());
m->started = true;
- import_pending_msg[df] = m;
import_state[df].state = IMPORT_DISCOVERING;
import_state[df].peer = from;
+ import_state[df].tid = m->get_tid();
} else {
// am i retrying after ancient path_traverse results?
- if (import_pending_msg.count(df) == 0 || import_pending_msg[df] != m) {
+ if (it == import_state.end() ||
+ it->second.peer != from ||
+ it->second.tid != m->get_tid()) {
dout(7) << " dropping obsolete message" << dendl;
m->put();
return;
}
+ assert(it->second.state == IMPORT_DISCOVERING);
}
if (!mds->mdcache->is_open()) {
// yay
dout(7) << "handle_export_discover have " << df << " inode " << *in << dendl;
- import_state[m->get_dirfrag()].state = IMPORT_DISCOVERED;
- import_pending_msg.erase(m->get_dirfrag());
+ import_state[df].state = IMPORT_DISCOVERED;
// pin inode in the cache (for now)
assert(in->is_dir());
// reply
dout(7) << " sending export_discover_ack on " << *in << dendl;
- mds->send_message_mds(new MExportDirDiscoverAck(df), import_state[df].peer);
+ mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid()), import_state[df].peer);
m->put();
assert (g_conf->mds_kill_import_at != 2);
}
void Migrator::import_reverse_discovering(dirfrag_t df)
{
- import_pending_msg.erase(df);
import_state.erase(df);
}
void Migrator::import_reverse_prepping(CDir *dir)
{
- import_pending_msg.erase(dir->dirfrag());
set<CDir*> bounds;
cache->map_dirfrag_set(import_state[dir->dirfrag()].bound_ls, bounds);
import_remove_pins(dir, bounds);
map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
if (!m->did_assim()) {
assert(it != import_state.end());
+ assert(it->second.state == IMPORT_DISCOVERED);
diri = cache->get_inode(m->get_dirfrag().ino);
assert(diri);
bufferlist::iterator p = m->basedir.begin();
dir = cache->add_replica_dir(p, diri, oldauth, finished);
dout(7) << "handle_export_prep on " << *dir << " (first pass)" << dendl;
} else {
- if (import_pending_msg.count(m->get_dirfrag()) == 0 ||
- import_pending_msg[m->get_dirfrag()] != m) {
+ if (it == import_state.end() ||
+ it->second.peer != oldauth ||
+ it->second.tid != m->get_tid()) {
dout(7) << "handle_export_prep obsolete message, dropping" << dendl;
m->put();
return;
}
+ assert(it->second.state == IMPORT_PREPPING);
dir = cache->get_dirfrag(m->get_dirfrag());
assert(dir);
if (!m->did_assim()) {
dout(7) << "doing assim on " << *dir << dendl;
m->mark_assim(); // only do this the first time!
- import_pending_msg[dir->dirfrag()] = m;
// change import state
it->second.state = IMPORT_PREPPING;
// ok!
dout(7) << " sending export_prep_ack on " << *dir << dendl;
- mds->send_message(new MExportDirPrepAck(dir->dirfrag()), m->get_connection());
+ mds->send_message(new MExportDirPrepAck(dir->dirfrag(), m->get_tid()), m->get_connection());
// note new state
it->second.state = IMPORT_PREPPED;
- import_pending_msg.erase(dir->dirfrag());
+
assert(g_conf->mds_kill_import_at != 4);
// done
m->put();
CDir *dir = cache->get_dirfrag(m->dirfrag);
assert(dir);
+ map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->dirfrag);
+ assert(it != import_state.end());
+ assert(it->second.state == IMPORT_PREPPED);
+ assert(it->second.tid == m->get_tid());
+
utime_t now = ceph_clock_now(g_ceph_context);
int oldauth = m->get_source().num();
dout(7) << "handle_export_dir importing " << *dir << " from " << oldauth << dendl;
dir, // import root
le,
mds->mdlog->get_current_segment(),
- import_state[dir->dirfrag()].peer_exports,
- import_state[dir->dirfrag()].updated_scatterlocks,
+ it->second.peer_exports,
+ it->second.updated_scatterlocks,
now);
}
dout(10) << " " << m->bounds.size() << " imported bounds" << dendl;
dout(7) << "handle_export_dir did " << *dir << dendl;
// note state
- import_state[dir->dirfrag()].state = IMPORT_LOGGINGSTART;
+ it->second.state = IMPORT_LOGGINGSTART;
assert (g_conf->mds_kill_import_at != 6);
// log it
p != stat.bystanders.end();
++p) {
MExportDirNotify *notify =
- new MExportDirNotify(dir->dirfrag(), false,
+ new MExportDirNotify(dir->dirfrag(), stat.tid, false,
pair<int,int>(stat.peer, mds->get_nodeid()),
pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
p != stat.bystanders.end();
++p) {
MExportDirNotify *notify =
- new MExportDirNotify(dir->dirfrag(), true,
+ new MExportDirNotify(dir->dirfrag(), stat.tid, true,
pair<int,int>(stat.peer, mds->get_nodeid()),
pair<int,int>(stat.peer, CDIR_AUTH_UNKNOWN));
for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
dout(7) << "import_logged " << *dir << dendl;
// note state
- import_state[dir->dirfrag()].state = IMPORT_ACKING;
+ it->second.state = IMPORT_ACKING;
assert (g_conf->mds_kill_import_at != 7);
// test surviving observer of a failed migration that did not complete
//assert(dir->replica_map.size() < 2 || mds->whoami != 0);
- mds->send_message_mds(new MExportDirAck(dir->dirfrag()), from);
+ mds->send_message_mds(new MExportDirAck(dir->dirfrag(), it->second.tid), from);
assert (g_conf->mds_kill_import_at != 8);
cache->show_subtrees();
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
assert(dir);
dout(7) << "handle_export_finish on " << *dir << dendl;
+
+ map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
+ assert(it != import_state.end());
+ assert(it->second.state == IMPORT_ACKING);
+ assert(it->second.tid == m->get_tid());
+
import_finish(dir, false);
m->put();
}
// send ack
if (m->wants_ack()) {
- mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag()), from);
+ mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag(), m->get_tid()), from);
} else {
// aborted. no ack.
dout(7) << "handle_export_notify no ack requested" << dendl;